Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

#32 Error sending message to kafka #81

Merged
merged 2 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import fund.cyber.common.kafka.JsonDeserializer
import fund.cyber.address.bitcoin.BitcoinAddressSummaryStorage
import fund.cyber.address.bitcoin.summary.BitcoinDeltaMerger
import fund.cyber.address.bitcoin.summary.BitcoinTxDeltaProcessor
import fund.cyber.common.kafka.defaultConsumerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
import fund.cyber.search.model.bitcoin.BitcoinTx
Expand Down Expand Up @@ -71,7 +73,7 @@ class BitcoinTxConsumerConfiguration {
}
}

private fun consumerConfigs(): MutableMap<String, Any> = mutableMapOf(
private fun consumerConfigs(): MutableMap<String, Any> = defaultConsumerConfig().with(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.GROUP_ID_CONFIG to "bitcoin-address-summary-update-process",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import fund.cyber.address.ethereum.summary.EthereumDeltaMerger
import fund.cyber.address.ethereum.summary.EthereumTxDeltaProcessor
import fund.cyber.address.ethereum.summary.EthereumUncleDeltaProcessor
import fund.cyber.common.kafka.JsonDeserializer
import fund.cyber.common.kafka.defaultConsumerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
import fund.cyber.search.model.chains.Chain
Expand Down Expand Up @@ -125,7 +127,7 @@ class EthereumTxConsumerConfiguration {
}
}

private fun consumerConfigs(): MutableMap<String, Any> = mutableMapOf(
private fun consumerConfigs(): MutableMap<String, Any> = defaultConsumerConfig().with(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.GROUP_ID_CONFIG to "ethereum-address-summary-update-process",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package fund.cyber.common.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig

const val KAFKA_MAX_MESSAGE_SIZE_BYTES = 15728640

fun defaultConsumerConfig() = mutableMapOf<String,Any>(
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES
)

fun defaultProducerConfig() = mutableMapOf<String,Any>(
ProducerConfig.MAX_REQUEST_SIZE_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES
)


7 changes: 7 additions & 0 deletions common/src/main/kotlin/fund/cyber/common/Collections.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fund.cyber.common


fun <K, V> MutableMap<K, V>.with(vararg pairs: Pair<K, V>): MutableMap<K, V> {
putAll(pairs)
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fund.cyber.dump.bitcoin

import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository
import fund.cyber.common.kafka.JsonDeserializer
import fund.cyber.common.kafka.defaultConsumerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.CHAIN
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
Expand Down Expand Up @@ -62,7 +64,7 @@ class ApplicationConfiguration {
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}

private fun consumerConfigs(): MutableMap<String, Any> = mutableMapOf(
private fun consumerConfigs(): MutableMap<String, Any> = defaultConsumerConfig().with(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import fund.cyber.cassandra.ethereum.repository.EthereumBlockTxRepository
import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository
import fund.cyber.cassandra.ethereum.repository.EthereumUncleRepository
import fund.cyber.common.kafka.JsonDeserializer
import fund.cyber.common.kafka.defaultConsumerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.CHAIN
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
Expand Down Expand Up @@ -138,7 +140,7 @@ class ApplicationConfiguration {
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}

private fun consumerConfigs(): MutableMap<String, Any> = mutableMapOf(
private fun consumerConfigs(): MutableMap<String, Any> = defaultConsumerConfig().with(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fund.cyber.pump.bitcoin.kafka

import fund.cyber.common.kafka.JsonSerializer
import fund.cyber.common.kafka.defaultProducerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
import fund.cyber.search.model.chains.BitcoinFamilyChain
Expand Down Expand Up @@ -67,7 +69,7 @@ class BitcoinBundleProducerConfiguration {
}

@Bean
fun producerConfigs(): Map<String, Any> = mapOf(
fun producerConfigs(): Map<String, Any> = defaultProducerConfig().with(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fund.cyber.pump.ethereum.kafka

import fund.cyber.common.kafka.JsonSerializer
import fund.cyber.common.kafka.defaultProducerConfig
import fund.cyber.common.with
import fund.cyber.search.configuration.KAFKA_BROKERS
import fund.cyber.search.configuration.KAFKA_BROKERS_DEFAULT
import fund.cyber.search.model.chains.EthereumFamilyChain
Expand Down Expand Up @@ -66,7 +68,7 @@ class EthereumBundleProducerConfiguration {
}

@Bean
fun producerConfigs(): Map<String, Any> = mapOf(
fun producerConfigs(): Map<String, Any> = defaultProducerConfig().with(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers
)

Expand Down