Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

#82 Contract Summary Tx processing stucks #117

Merged
merged 2 commits into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ and this project adheres to [Semantic Visioning](http://semver.org/spec/v2.0.0.h

## [0.3.0] - Bitcoin pump, dump, contract summary
### Added
- [#108](/../../issues/108) Create API for search results preview
- [#106](/../../issues/106) Add filter params for search api
- [#84](/../../issues/84) API: bitcoin endpoints
- [#98](/../../issues/98) Make chain items retrivial caps non-depended
- [#85](/../../issues/85) Bitcoin pump/dump and bitcoin contract summary docker images, dockerhub
### Fixed
- [#113](/../../issues/113) Make search API logic dependent of existing keyspaces
- [#99](/../../issues/99) Pump stuck if chain reorganization bundles number exceed history stack size


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ interface BitcoinUpdateContractSummaryRepository : ReactiveCrudRepository<CqlBit
fun findByHash(hash: String): Mono<CqlBitcoinContractSummary>

@Consistency(value = ConsistencyLevel.LOCAL_QUORUM)
fun findAllByHashIn(hashes: Iterable<String>): Flux<CqlBitcoinContractSummary>
fun findAllByHash(hashes: Iterable<String>): Flux<CqlBitcoinContractSummary> = Flux.fromIterable(hashes)
.flatMap { hash -> findByHash(hash) }

/**
* Return {@code true} if update was successful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ interface EthereumUpdateContractSummaryRepository : ReactiveCrudRepository<CqlEt
fun findByHash(hash: String): Mono<CqlEthereumContractSummary>

@Consistency(value = ConsistencyLevel.LOCAL_QUORUM)
fun findAllByHashIn(hashes: Iterable<String>): Flux<CqlEthereumContractSummary>
fun findAllByHash(hashes: Iterable<String>): Flux<CqlEthereumContractSummary> = Flux.fromIterable(hashes)
.flatMap { hash -> findByHash(hash) }

/**
* Return {@code true} if update was successful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BitcoinContractSummaryStorage(
override fun findById(id: String): Mono<CqlBitcoinContractSummary> = contractSummaryRepository.findByHash(id)

override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlBitcoinContractSummary> = contractSummaryRepository
.findAllByHashIn(ids)
.findAllByHash(ids)

override fun update(summary: CqlBitcoinContractSummary, oldVersion: Long): Mono<Boolean> = contractSummaryRepository
.update(summary, oldVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.transaction.annotation.EnableTransactionManagement

private const val MAX_POLL_RECORDS_CONFIG = 500
private const val SESSION_TIMEOUT_MS_CONFIG = 30000

@EnableKafka
@Configuration
Expand Down Expand Up @@ -79,6 +80,7 @@ class BitcoinTxConsumerConfiguration {
ConsumerConfig.GROUP_ID_CONFIG to "bitcoin-contract-summary-update-process",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ fun <T> Flux<T>.await(): List<T> {
}

fun CqlContractSummary.hasSameTopicPartitionAs(delta: ContractSummaryDelta<*>) =
this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition
this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition

fun CqlContractSummary.hasSameTopicPartitionAs(topic: String, partition: Int) =
this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition
this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition

fun CqlContractSummary.notSameTopicPartitionAs(delta: ContractSummaryDelta<*>) =
hasSameTopicPartitionAs(delta).not()
hasSameTopicPartitionAs(delta).not()

fun CqlContractSummary.committed() = this.kafkaDeltaOffsetCommitted

Expand All @@ -51,14 +51,14 @@ private const val MAX_STORE_ATTEMPTS = 20
private const val STORE_RETRY_TIMEOUT = 30L

data class UpdateInfo(
val topic: String,
val partition: Int,
val minOffset: Long,
val maxOffset: Long
val topic: String,
val partition: Int,
val minOffset: Long,
val maxOffset: Long
) {
constructor(records: List<ConsumerRecord<*, *>>) : this(
topic = records.first().topic(), partition = records.first().partition(),
minOffset = records.first().offset(), maxOffset = records.last().offset()
topic = records.first().topic(), partition = records.first().partition(),
minOffset = records.first().offset(), maxOffset = records.last().offset()
)
}

Expand All @@ -83,29 +83,34 @@ class UpdateContractSummaryProcess<R, S : CqlContractSummary, D : ContractSummar
private lateinit var commitCassandraTimer: Timer
private lateinit var downloadCassandraTimer: Timer
private lateinit var currentOffsetMonitor: AtomicLong
private lateinit var recordsProcessingTimer: Timer

override fun onMessage(records: List<ConsumerRecord<PumpEvent, R>>, consumer: Consumer<*, *>) {

val info = UpdateInfo(records.sortedBy { record -> record.offset() })
initMonitors(info)
recordsProcessingTimer.recordCallable { processRecords(records, consumer, info) }
}

private fun processRecords(records: List<ConsumerRecord<PumpEvent, R>>, consumer: Consumer<*, *>,
info: UpdateInfo) {

log.info("Processing records for topic: ${info.topic}; partition ${info.partition} from ${info.minOffset}" +
" to ${info.maxOffset} offset.")
" to ${info.maxOffset} offset.")
val storeAttempts: MutableMap<String, Int> = mutableMapOf()
val previousStates: MutableMap<String, S?> = mutableMapOf()
initMonitors(info)

val contracts = deltaProcessor.affectedContracts(records)

val contractsSummary = contractSummaryStorage.findAllByIdIn(contracts)
.await().groupBy { a -> a.hash }.map { (k, v) -> k to v.first() }.toMap()
.await().groupBy { a -> a.hash }.map { (k, v) -> k to v.first() }.toMap()

val deltas = records.flatMap { record -> deltaProcessor.recordToDeltas(record) }

val mergedDeltas = deltas.groupBy { delta -> delta.contract }
.filterKeys { contract -> contract.isNotEmpty() }
.mapValues { contractDeltas -> deltaMerger.mergeDeltas(contractDeltas.value, contractsSummary) }
.filterValues { value -> value != null }
.map { entry -> entry.key to entry.value!! }.toMap()
.filterKeys { contract -> contract.isNotEmpty() }
.mapValues { contractDeltas -> deltaMerger.mergeDeltas(contractDeltas.value, contractsSummary) }
.filterValues { value -> value != null }
.map { entry -> entry.key to entry.value!! }.toMap()

try {

Expand Down Expand Up @@ -140,19 +145,19 @@ class UpdateContractSummaryProcess<R, S : CqlContractSummary, D : ContractSummar
} catch (e: ContractLockException) {

log.debug("Possible contract lock for ${info.topic} topic," +
" ${info.partition} partition, offset: ${info.minOffset}-${info.maxOffset}. Reverting changes...")
" ${info.partition} partition, offset: ${info.minOffset}-${info.maxOffset}. Reverting changes...")
applyLockMonitor.increment()
revertChanges(contracts, previousStates, info)
log.debug("Changes for ${info.topic} topic, ${info.partition} partition," +
" offset: ${info.minOffset}-${info.maxOffset} reverted!")
" offset: ${info.minOffset}-${info.maxOffset} reverted!")
}
}

private fun revertChanges(contracts: Set<String>, previousStates: MutableMap<String, S?>, info: UpdateInfo) {

contractSummaryStorage.findAllByIdIn(contracts).await().forEach { summary ->
if (summary.notCommitted() && summary.hasSameTopicPartitionAs(info.topic, info.partition)
&& summary.kafkaDeltaOffset in info.minOffset..info.maxOffset) {
&& summary.kafkaDeltaOffset in info.minOffset..info.maxOffset) {
val previousState = previousStates[summary.hash]
if (previousState != null) {
contractSummaryStorage.update(previousState)
Expand Down Expand Up @@ -208,7 +213,7 @@ class UpdateContractSummaryProcess<R, S : CqlContractSummary, D : ContractSummar
}

private suspend fun D.applyTo(summary: S): Boolean =
contractSummaryStorage.update(this.updateSummary(summary), summary.version).await()!!
contractSummaryStorage.update(this.updateSummary(summary), summary.version).await()!!

private suspend fun getSummaryByDelta(delta: D) = contractSummaryStorage.findById(delta.contract).await()

Expand All @@ -231,18 +236,21 @@ class UpdateContractSummaryProcess<R, S : CqlContractSummary, D : ContractSummar
}
if (!(::currentOffsetMonitor.isInitialized)) {
currentOffsetMonitor = monitoring.gauge("contract_summary_topic_current_offset", tags,
AtomicLong(info.maxOffset))!!
AtomicLong(info.maxOffset))!!
}
if (!(::recordsProcessingTimer.isInitialized)) {
recordsProcessingTimer = monitoring.timer("contract_summary_records_processing", tags)
}
}

private fun CqlContractSummary.currentTopicPartitionWentFurther() =
lastOffsetOf(this.kafkaDeltaTopic, this.kafkaDeltaPartition) > this.kafkaDeltaOffset//todo : = or >= ????
lastOffsetOf(this.kafkaDeltaTopic, this.kafkaDeltaPartition) > this.kafkaDeltaOffset//todo : = or >= ????

private fun lastOffsetOf(topic: String, partition: Int): Long {

val reader = SinglePartitionTopicLastItemsReader(
kafkaBrokers = kafkaBrokers, topic = topic,
keyClass = Any::class.java, valueClass = Any::class.java
kafkaBrokers = kafkaBrokers, topic = topic,
keyClass = Any::class.java, valueClass = Any::class.java
)
return reader.readLastOffset(partition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class EthereumContractSummaryStorage(
override fun findById(id: String): Mono<CqlEthereumContractSummary> = contractSummaryRepository.findByHash(id)

override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlEthereumContractSummary> = contractSummaryRepository
.findAllByHashIn(ids)
.findAllByHash(ids)

override fun update(summary: CqlEthereumContractSummary, oldVersion: Long): Mono<Boolean>
= contractSummaryRepository.update(summary, oldVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.transaction.annotation.EnableTransactionManagement

private const val MAX_POLL_RECORDS_CONFIG = 500
private const val SESSION_TIMEOUT_MS_CONFIG = 30000

@EnableKafka
@Configuration
Expand Down Expand Up @@ -71,13 +72,13 @@ class EthereumTxConsumerConfiguration {
fun txListenerContainer(): ConcurrentMessageListenerContainer<PumpEvent, EthereumTx> {

val consumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumTx::class.java)
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumTx::class.java)
)

val containerProperties = ContainerProperties(chain.txPumpTopic).apply {
setBatchErrorHandler(SeekToCurrentBatchErrorHandler())
messageListener = UpdateContractSummaryProcess(contractSummaryStorage, txDeltaProcessor, deltaMerger,
monitoring, kafkaBrokers)
monitoring, kafkaBrokers)
isAckOnError = false
ackMode = AbstractMessageListenerContainer.AckMode.BATCH
}
Expand All @@ -91,13 +92,13 @@ class EthereumTxConsumerConfiguration {
fun blockListenerContainer(): ConcurrentMessageListenerContainer<PumpEvent, EthereumBlock> {

val consumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumBlock::class.java)
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumBlock::class.java)
)

val containerProperties = ContainerProperties(chain.blockPumpTopic).apply {
setBatchErrorHandler(SeekToCurrentBatchErrorHandler())
messageListener = UpdateContractSummaryProcess(contractSummaryStorage, blockDeltaProcessor, deltaMerger,
monitoring, kafkaBrokers)
monitoring, kafkaBrokers)
isAckOnError = false
ackMode = AbstractMessageListenerContainer.AckMode.BATCH
}
Expand All @@ -111,13 +112,13 @@ class EthereumTxConsumerConfiguration {
fun uncleListenerContainer(): ConcurrentMessageListenerContainer<PumpEvent, EthereumUncle> {

val consumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumUncle::class.java)
consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumUncle::class.java)
)

val containerProperties = ContainerProperties(chain.unclePumpTopic).apply {
setBatchErrorHandler(SeekToCurrentBatchErrorHandler())
messageListener = UpdateContractSummaryProcess(contractSummaryStorage, uncleDeltaProcessor, deltaMerger,
monitoring, kafkaBrokers)
monitoring, kafkaBrokers)
isAckOnError = false
ackMode = AbstractMessageListenerContainer.AckMode.BATCH
}
Expand All @@ -128,11 +129,12 @@ class EthereumTxConsumerConfiguration {
}

private fun consumerConfigs(): MutableMap<String, Any> = defaultConsumerConfig().with(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.GROUP_ID_CONFIG to "ethereum-contract-summary-update-process",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.GROUP_ID_CONFIG to "ethereum-contract-summary-update-process",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG
)
}