diff --git a/supply/common/src/main/kotlin/fund/cyber/supply/CommonConfiguration.kt b/supply/common/src/main/kotlin/fund/cyber/supply/CommonConfiguration.kt new file mode 100644 index 00000000..631fd136 --- /dev/null +++ b/supply/common/src/main/kotlin/fund/cyber/supply/CommonConfiguration.kt @@ -0,0 +1,88 @@ +package fund.cyber.supply + +import fund.cyber.common.kafka.JsonSerializer +import fund.cyber.common.kafka.defaultProducerConfig +import fund.cyber.common.with +import fund.cyber.search.configuration.CHAIN_FAMILY +import fund.cyber.search.configuration.CHAIN_NAME +import fund.cyber.search.configuration.KAFKA_BROKERS +import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT +import fund.cyber.search.model.chains.ChainFamily +import fund.cyber.search.model.chains.ChainInfo +import fund.cyber.search.model.events.PumpEvent +import fund.cyber.search.model.events.supplyTopic +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.TopicConfig +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaAdmin +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.transaction.KafkaTransactionManager +import javax.annotation.PostConstruct + + +@Configuration +@ComponentScan("fund.cyber.supply") +class CommonConfiguration { + + @Value("\${$KAFKA_BROKERS:$KAFKA_BROKERS_DEFAULT}") + private lateinit var kafkaBrokers: String + + @Value("\${$CHAIN_FAMILY:}") + private lateinit var chainFamily: String + + @Value("\${$CHAIN_NAME:}") + private lateinit var chainName: String + + + @Bean + fun chainInfo() = ChainInfo(ChainFamily.valueOf(chainFamily), chainName) + + @Bean + fun producerFactory(): ProducerFactory { + + val config = defaultProducerConfig().with( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers + ) + return DefaultKafkaProducerFactory(config, JsonSerializer(), JsonSerializer()) + .apply { setTransactionIdPrefix(chainInfo().fullName + "_SUPPLY") } + } + + @Bean + fun transactionManager() = KafkaTransactionManager(producerFactory()) + + @Bean + fun kafkaTemplate() = KafkaTemplate(producerFactory()) + + + @Bean + fun kafkaAdmin(): KafkaAdmin { + val configs = mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers) + return KafkaAdmin(configs).apply { this.setFatalIfBrokerNotAvailable(true) } + } + + @PostConstruct + fun createTopics() { + + val kafkaClient = AdminClient.create(kafkaAdmin().config) + + val supplyTopicConfig = mapOf( + TopicConfig.RETENTION_BYTES_CONFIG to "52428800", + TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE + ) + + val supplyTopic = NewTopic(chainInfo().supplyTopic, 1, 1).configs(supplyTopicConfig) + + kafkaClient.createTopics(listOf(supplyTopic)) + kafkaClient.close() + } + + +} \ No newline at end of file diff --git a/supply/common/src/main/kotlin/fund/cyber/supply/common/CurrentSupplyProvider.kt b/supply/common/src/main/kotlin/fund/cyber/supply/common/CurrentSupplyProvider.kt new file mode 100644 index 00000000..2a0c9c95 --- /dev/null +++ b/supply/common/src/main/kotlin/fund/cyber/supply/common/CurrentSupplyProvider.kt @@ -0,0 +1,50 @@ +package fund.cyber.supply.common + +import fund.cyber.common.kafka.reader.SinglePartitionTopicLastItemsReader +import fund.cyber.search.configuration.GENESIS_SUPPLY +import fund.cyber.search.configuration.KAFKA_BROKERS +import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT +import fund.cyber.search.model.chains.ChainInfo +import fund.cyber.search.model.events.supplyTopic +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Component +import java.math.BigDecimal + + +private val log = LoggerFactory.getLogger(CurrentSupplyProvider::class.java)!! + +@Component +class CurrentSupplyProvider { + + @Value("\${$KAFKA_BROKERS:$KAFKA_BROKERS_DEFAULT}") + private lateinit var kafkaBrokers: String + + @Value("\${$GENESIS_SUPPLY:}") + private lateinit var genesisSupply: String + + @Autowired + lateinit var chainInfo: ChainInfo + + fun getLastCalculatedSupply(supplyClass: Class, genesisSupplyCreator: (genesis: BigDecimal) -> T): T { + + val topicReader = SinglePartitionTopicLastItemsReader( + kafkaBrokers = kafkaBrokers, topic = chainInfo.supplyTopic, + keyClass = Any::class.java, valueClass = supplyClass + ) + + val keyToValue = topicReader.readLastRecords(1).firstOrNull() + + return if (keyToValue != null) { + keyToValue.second + } else { + if (genesisSupply.trim().isEmpty()) { + log.error("Please specify env variable `GENESIS_SUPPLY`. " + + "For example, initial Ethereum supply is 72009990.50") + throw RuntimeException("`GENESIS_SUPPLY` is not provided") + } + genesisSupplyCreator(BigDecimal(genesisSupply)) + } + } +} \ No newline at end of file diff --git a/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/SupplyApplication.kt b/supply/ethereum/src/main/kotlin/fund/cyber/supply/SupplyApplication.kt similarity index 94% rename from supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/SupplyApplication.kt rename to supply/ethereum/src/main/kotlin/fund/cyber/supply/SupplyApplication.kt index 13888e16..d18d5ada 100644 --- a/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/SupplyApplication.kt +++ b/supply/ethereum/src/main/kotlin/fund/cyber/supply/SupplyApplication.kt @@ -1,4 +1,4 @@ -package fund.cyber.supply.ethereum +package fund.cyber.supply import org.springframework.boot.SpringApplication import org.springframework.boot.autoconfigure.SpringBootApplication diff --git a/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/ApplicationConfiguration.kt b/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/ApplicationConfiguration.kt index 962aa935..66fd2707 100644 --- a/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/ApplicationConfiguration.kt +++ b/supply/ethereum/src/main/kotlin/fund/cyber/supply/ethereum/ApplicationConfiguration.kt @@ -2,40 +2,25 @@ package fund.cyber.supply.ethereum import fund.cyber.common.kafka.DEFAULT_POLL_TIMEOUT import fund.cyber.common.kafka.JsonDeserializer -import fund.cyber.common.kafka.JsonSerializer import fund.cyber.common.kafka.defaultConsumerConfig -import fund.cyber.common.kafka.defaultProducerConfig -import fund.cyber.common.kafka.reader.SinglePartitionTopicLastItemsReader import fund.cyber.common.with -import fund.cyber.search.configuration.CHAIN_FAMILY -import fund.cyber.search.configuration.CHAIN_NAME -import fund.cyber.search.configuration.GENESIS_SUPPLY import fund.cyber.search.configuration.KAFKA_BROKERS import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT -import fund.cyber.search.model.chains.ChainFamily import fund.cyber.search.model.chains.ChainInfo import fund.cyber.search.model.ethereum.EthereumSupply import fund.cyber.search.model.events.PumpEvent import fund.cyber.search.model.events.blockPumpTopic -import fund.cyber.search.model.events.supplyTopic import fund.cyber.search.model.events.unclePumpTopic -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.NewTopic +import fund.cyber.supply.common.CurrentSupplyProvider import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaAdmin import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.BATCH import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler @@ -43,10 +28,9 @@ import org.springframework.kafka.listener.config.ContainerProperties import org.springframework.kafka.transaction.KafkaTransactionManager import java.math.BigDecimal import java.math.BigDecimal.ZERO -import javax.annotation.PostConstruct +const val MAX_RECORDS_BATCH_SIZE = 5000 -private val log = LoggerFactory.getLogger(ApplicationConfiguration::class.java)!! @Configuration class ApplicationConfiguration { @@ -54,112 +38,61 @@ class ApplicationConfiguration { @Value("\${$KAFKA_BROKERS:$KAFKA_BROKERS_DEFAULT}") private lateinit var kafkaBrokers: String - @Value("\${$CHAIN_FAMILY:}") - private lateinit var chainFamily: String + @Autowired + lateinit var chainInfo: ChainInfo - @Value("\${$CHAIN_NAME:}") - private lateinit var chainName: String + @Autowired + lateinit var kafkaTemplate: KafkaTemplate - @Value("\${$GENESIS_SUPPLY:}") - private lateinit var genesisSupply: String - - @Bean - fun chainInfo() = ChainInfo(ChainFamily.valueOf(chainFamily), chainName) - - @Bean - fun producerFactory(): ProducerFactory { - - val config = defaultProducerConfig().with( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers - ) - return DefaultKafkaProducerFactory(config, JsonSerializer(), JsonSerializer()) - .apply { setTransactionIdPrefix(chainInfo().fullName + "_SUPPLY") } - } - - - @Bean - fun transactionManager() = KafkaTransactionManager(producerFactory()) - - @Bean - fun kafkaTemplate() = KafkaTemplate(producerFactory()) + @Autowired + lateinit var kafkaTransactionManager: KafkaTransactionManager - - @Bean - fun kafkaAdmin(): KafkaAdmin { - val configs = mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers) - return KafkaAdmin(configs).apply { this.setFatalIfBrokerNotAvailable(true) } - } - - @PostConstruct - fun createTopics() { - - val kafkaClient = AdminClient.create(kafkaAdmin().config) - - val supplyTopicConfig = mapOf( - TopicConfig.RETENTION_BYTES_CONFIG to "52428800", - TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE - ) - - val supplyTopic = NewTopic(chainInfo().supplyTopic, 1, 1).configs(supplyTopicConfig) - - kafkaClient.createTopics(listOf(supplyTopic)) - kafkaClient.close() - } + @Autowired + lateinit var currentSupplyProvider: CurrentSupplyProvider @Bean fun blocksAndUnclesListenerContainerFactory(): KafkaMessageListenerContainer { val consumerConfig = consumerConfigs().apply { put(ConsumerConfig.GROUP_ID_CONFIG, "ethereum-supply-process") + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_RECORDS_BATCH_SIZE) } val consumerFactory = DefaultKafkaConsumerFactory( consumerConfig, JsonDeserializer(PumpEvent::class.java), ByteArrayDeserializer() ) - val containerProperties = ContainerProperties(chainInfo().blockPumpTopic, chainInfo().unclePumpTopic).apply { + val containerProperties = ContainerProperties(chainInfo.blockPumpTopic, chainInfo.unclePumpTopic).apply { messageListener = CalculateEthereumSupplyProcess( - chainInfo = chainInfo(), currentSupply = getLastCalculatedSupply(), kafka = kafkaTemplate() + chainInfo = chainInfo, currentSupply = currentSupply(), kafka = kafkaTemplate ) pollTimeout = DEFAULT_POLL_TIMEOUT ackMode = BATCH - transactionManager = transactionManager() + transactionManager = kafkaTransactionManager setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) } return KafkaMessageListenerContainer(consumerFactory, containerProperties) } - private fun getLastCalculatedSupply(): EthereumSupply { - - val topicReader = SinglePartitionTopicLastItemsReader( - kafkaBrokers = kafkaBrokers, topic = chainInfo().supplyTopic, - keyClass = Any::class.java, valueClass = EthereumSupply::class.java - ) - - val keyToValue = topicReader.readLastRecords(1).firstOrNull() - return keyToValue?.second ?: genesisSupply() - } - - private fun genesisSupply(): EthereumSupply { - - if (genesisSupply.trim().isEmpty()) { - log.error("Please specify env variable `GENESIS_SUPPLY`. " + - "For example, initial Ethereum supply is 72009990.50") - throw RuntimeException("`GENESIS_SUPPLY` is not provided") - } - - return EthereumSupply( - blockNumber = 0, uncleNumber = 0, - totalSupply = BigDecimal(genesisSupply), genesisSupply = BigDecimal(genesisSupply), - miningBlocksSupply = ZERO, miningUnclesSupply = ZERO, includingUnclesSupply = ZERO - ) - } - - private fun consumerConfigs(): MutableMap = defaultConsumerConfig().with( + fun consumerConfigs(): MutableMap = defaultConsumerConfig().with( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase() ) + + private fun currentSupply(): EthereumSupply { + + val createGenesisSupplyFunction = { genesis: BigDecimal -> + EthereumSupply( + blockNumber = 0, uncleNumber = 0, + totalSupply = genesis, genesisSupply = genesis, + miningBlocksSupply = ZERO, miningUnclesSupply = ZERO, includingUnclesSupply = ZERO + ) + } + return currentSupplyProvider.getLastCalculatedSupply( + EthereumSupply::class.java, createGenesisSupplyFunction + ) + } } \ No newline at end of file diff --git a/supply/ethereum/src/test/kotlin/fund/cyber/supply/ethereum/EthereumSupplyBaseTest.kt b/supply/ethereum/src/test/kotlin/fund/cyber/supply/ethereum/EthereumSupplyBaseTest.kt index 870ca1f9..14730f62 100644 --- a/supply/ethereum/src/test/kotlin/fund/cyber/supply/ethereum/EthereumSupplyBaseTest.kt +++ b/supply/ethereum/src/test/kotlin/fund/cyber/supply/ethereum/EthereumSupplyBaseTest.kt @@ -1,10 +1,11 @@ package fund.cyber.supply.ethereum import fund.cyber.common.kafka.BaseKafkaIntegrationTestWithStartedKafka +import fund.cyber.supply.CommonConfiguration import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @TestPropertySource(properties = ["CHAIN_FAMILY:ETHEREUM", "GENESIS_SUPPLY:72009990.50"]) -@ContextConfiguration(classes = [ApplicationConfiguration::class]) +@ContextConfiguration(classes = [ApplicationConfiguration::class, CommonConfiguration::class]) abstract class EthereumSupplyBaseTest : BaseKafkaIntegrationTestWithStartedKafka()