diff --git a/CHANGELOG.md b/CHANGELOG.md index db1534b0..a9d07453 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,13 @@ and this project adheres to [Semantic Visioning](http://semver.org/spec/v2.0.0.h ## [0.3.0] - Bitcoin pump, dump, contract summary ### Added +- [#108](/../../issues/108) Create API for search results preview - [#106](/../../issues/106) Add filter params for search api - [#84](/../../issues/84) API: bitcoin endpoints - [#98](/../../issues/98) Make chain items retrivial caps non-depended - [#85](/../../issues/85) Bitcoin pump/dump and bitcoin contract summary docker images, dockerhub ### Fixed +- [#113](/../../issues/113) Make search API logic dependent of existing keyspaces - [#99](/../../issues/99) Pump stuck if chain reorganization bundles number exceed history stack size diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateContractSummaryRepository.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateContractSummaryRepository.kt index 46ced3ab..98263ec8 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateContractSummaryRepository.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateContractSummaryRepository.kt @@ -19,7 +19,8 @@ interface BitcoinUpdateContractSummaryRepository : ReactiveCrudRepository @Consistency(value = ConsistencyLevel.LOCAL_QUORUM) - fun findAllByHashIn(hashes: Iterable): Flux + fun findAllByHash(hashes: Iterable): Flux = Flux.fromIterable(hashes) + .flatMap { hash -> findByHash(hash) } /** * Return {@code true} if update was successful. diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateContractSummaryRepository.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateContractSummaryRepository.kt index f32eaf78..cfe6f44e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateContractSummaryRepository.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateContractSummaryRepository.kt @@ -20,7 +20,8 @@ interface EthereumUpdateContractSummaryRepository : ReactiveCrudRepository @Consistency(value = ConsistencyLevel.LOCAL_QUORUM) - fun findAllByHashIn(hashes: Iterable): Flux + fun findAllByHash(hashes: Iterable): Flux = Flux.fromIterable(hashes) + .flatMap { hash -> findByHash(hash) } /** * Return {@code true} if update was successful. diff --git a/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/BitcoinContractSummaryStorage.kt b/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/BitcoinContractSummaryStorage.kt index 91ab1550..080ecec6 100644 --- a/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/BitcoinContractSummaryStorage.kt +++ b/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/BitcoinContractSummaryStorage.kt @@ -15,7 +15,7 @@ class BitcoinContractSummaryStorage( override fun findById(id: String): Mono = contractSummaryRepository.findByHash(id) override fun findAllByIdIn(ids: Iterable): Flux = contractSummaryRepository - .findAllByHashIn(ids) + .findAllByHash(ids) override fun update(summary: CqlBitcoinContractSummary, oldVersion: Long): Mono = contractSummaryRepository .update(summary, oldVersion) diff --git a/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt b/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt index d1c9c589..60780e5a 100644 --- a/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt +++ b/contract-summary/bitcoin/src/main/kotlin/fund/cyber/contract/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.config.ContainerProperties import org.springframework.transaction.annotation.EnableTransactionManagement private const val MAX_POLL_RECORDS_CONFIG = 500 +private const val SESSION_TIMEOUT_MS_CONFIG = 30000 @EnableKafka @Configuration @@ -79,6 +80,7 @@ class BitcoinTxConsumerConfiguration { ConsumerConfig.GROUP_ID_CONFIG to "bitcoin-contract-summary-update-process", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG + ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG ) } diff --git a/contract-summary/common/src/main/kotlin/fund/cyber/contract/common/delta/apply/UpdateContractSummaryProcess.kt b/contract-summary/common/src/main/kotlin/fund/cyber/contract/common/delta/apply/UpdateContractSummaryProcess.kt index c56156fb..89d3007b 100644 --- a/contract-summary/common/src/main/kotlin/fund/cyber/contract/common/delta/apply/UpdateContractSummaryProcess.kt +++ b/contract-summary/common/src/main/kotlin/fund/cyber/contract/common/delta/apply/UpdateContractSummaryProcess.kt @@ -31,13 +31,13 @@ fun Flux.await(): List { } fun CqlContractSummary.hasSameTopicPartitionAs(delta: ContractSummaryDelta<*>) = - this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition + this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition fun CqlContractSummary.hasSameTopicPartitionAs(topic: String, partition: Int) = - this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition + this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition fun CqlContractSummary.notSameTopicPartitionAs(delta: ContractSummaryDelta<*>) = - hasSameTopicPartitionAs(delta).not() + hasSameTopicPartitionAs(delta).not() fun CqlContractSummary.committed() = this.kafkaDeltaOffsetCommitted @@ -51,14 +51,14 @@ private const val MAX_STORE_ATTEMPTS = 20 private const val STORE_RETRY_TIMEOUT = 30L data class UpdateInfo( - val topic: String, - val partition: Int, - val minOffset: Long, - val maxOffset: Long + val topic: String, + val partition: Int, + val minOffset: Long, + val maxOffset: Long ) { constructor(records: List>) : this( - topic = records.first().topic(), partition = records.first().partition(), - minOffset = records.first().offset(), maxOffset = records.last().offset() + topic = records.first().topic(), partition = records.first().partition(), + minOffset = records.first().offset(), maxOffset = records.last().offset() ) } @@ -83,29 +83,34 @@ class UpdateContractSummaryProcess>, consumer: Consumer<*, *>) { - val info = UpdateInfo(records.sortedBy { record -> record.offset() }) + initMonitors(info) + recordsProcessingTimer.recordCallable { processRecords(records, consumer, info) } + } + + private fun processRecords(records: List>, consumer: Consumer<*, *>, + info: UpdateInfo) { log.info("Processing records for topic: ${info.topic}; partition ${info.partition} from ${info.minOffset}" + - " to ${info.maxOffset} offset.") + " to ${info.maxOffset} offset.") val storeAttempts: MutableMap = mutableMapOf() val previousStates: MutableMap = mutableMapOf() - initMonitors(info) val contracts = deltaProcessor.affectedContracts(records) val contractsSummary = contractSummaryStorage.findAllByIdIn(contracts) - .await().groupBy { a -> a.hash }.map { (k, v) -> k to v.first() }.toMap() + .await().groupBy { a -> a.hash }.map { (k, v) -> k to v.first() }.toMap() val deltas = records.flatMap { record -> deltaProcessor.recordToDeltas(record) } val mergedDeltas = deltas.groupBy { delta -> delta.contract } - .filterKeys { contract -> contract.isNotEmpty() } - .mapValues { contractDeltas -> deltaMerger.mergeDeltas(contractDeltas.value, contractsSummary) } - .filterValues { value -> value != null } - .map { entry -> entry.key to entry.value!! }.toMap() + .filterKeys { contract -> contract.isNotEmpty() } + .mapValues { contractDeltas -> deltaMerger.mergeDeltas(contractDeltas.value, contractsSummary) } + .filterValues { value -> value != null } + .map { entry -> entry.key to entry.value!! }.toMap() try { @@ -140,11 +145,11 @@ class UpdateContractSummaryProcess if (summary.notCommitted() && summary.hasSameTopicPartitionAs(info.topic, info.partition) - && summary.kafkaDeltaOffset in info.minOffset..info.maxOffset) { + && summary.kafkaDeltaOffset in info.minOffset..info.maxOffset) { val previousState = previousStates[summary.hash] if (previousState != null) { contractSummaryStorage.update(previousState) @@ -208,7 +213,7 @@ class UpdateContractSummaryProcess this.kafkaDeltaOffset//todo : = or >= ???? + lastOffsetOf(this.kafkaDeltaTopic, this.kafkaDeltaPartition) > this.kafkaDeltaOffset//todo : = or >= ???? private fun lastOffsetOf(topic: String, partition: Int): Long { val reader = SinglePartitionTopicLastItemsReader( - kafkaBrokers = kafkaBrokers, topic = topic, - keyClass = Any::class.java, valueClass = Any::class.java + kafkaBrokers = kafkaBrokers, topic = topic, + keyClass = Any::class.java, valueClass = Any::class.java ) return reader.readLastOffset(partition) } diff --git a/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/EthereumContractSummaryStorage.kt b/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/EthereumContractSummaryStorage.kt index cd1e2f64..05a7d287 100644 --- a/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/EthereumContractSummaryStorage.kt +++ b/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/EthereumContractSummaryStorage.kt @@ -15,7 +15,7 @@ class EthereumContractSummaryStorage( override fun findById(id: String): Mono = contractSummaryRepository.findByHash(id) override fun findAllByIdIn(ids: Iterable): Flux = contractSummaryRepository - .findAllByHashIn(ids) + .findAllByHash(ids) override fun update(summary: CqlEthereumContractSummary, oldVersion: Long): Mono = contractSummaryRepository.update(summary, oldVersion) diff --git a/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/delta/apply/EthereumConsumerConfiguration.kt b/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/delta/apply/EthereumConsumerConfiguration.kt index c3355794..359e7f8e 100644 --- a/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/delta/apply/EthereumConsumerConfiguration.kt +++ b/contract-summary/ethereum/src/main/kotlin/fund/cyber/contract/ethereum/delta/apply/EthereumConsumerConfiguration.kt @@ -35,6 +35,7 @@ import org.springframework.kafka.listener.config.ContainerProperties import org.springframework.transaction.annotation.EnableTransactionManagement private const val MAX_POLL_RECORDS_CONFIG = 500 +private const val SESSION_TIMEOUT_MS_CONFIG = 30000 @EnableKafka @Configuration @@ -71,13 +72,13 @@ class EthereumTxConsumerConfiguration { fun txListenerContainer(): ConcurrentMessageListenerContainer { val consumerFactory = DefaultKafkaConsumerFactory( - consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumTx::class.java) + consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumTx::class.java) ) val containerProperties = ContainerProperties(chain.txPumpTopic).apply { setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) messageListener = UpdateContractSummaryProcess(contractSummaryStorage, txDeltaProcessor, deltaMerger, - monitoring, kafkaBrokers) + monitoring, kafkaBrokers) isAckOnError = false ackMode = AbstractMessageListenerContainer.AckMode.BATCH } @@ -91,13 +92,13 @@ class EthereumTxConsumerConfiguration { fun blockListenerContainer(): ConcurrentMessageListenerContainer { val consumerFactory = DefaultKafkaConsumerFactory( - consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumBlock::class.java) + consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumBlock::class.java) ) val containerProperties = ContainerProperties(chain.blockPumpTopic).apply { setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) messageListener = UpdateContractSummaryProcess(contractSummaryStorage, blockDeltaProcessor, deltaMerger, - monitoring, kafkaBrokers) + monitoring, kafkaBrokers) isAckOnError = false ackMode = AbstractMessageListenerContainer.AckMode.BATCH } @@ -111,13 +112,13 @@ class EthereumTxConsumerConfiguration { fun uncleListenerContainer(): ConcurrentMessageListenerContainer { val consumerFactory = DefaultKafkaConsumerFactory( - consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumUncle::class.java) + consumerConfigs(), JsonDeserializer(PumpEvent::class.java), JsonDeserializer(EthereumUncle::class.java) ) val containerProperties = ContainerProperties(chain.unclePumpTopic).apply { setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) messageListener = UpdateContractSummaryProcess(contractSummaryStorage, uncleDeltaProcessor, deltaMerger, - monitoring, kafkaBrokers) + monitoring, kafkaBrokers) isAckOnError = false ackMode = AbstractMessageListenerContainer.AckMode.BATCH } @@ -128,11 +129,12 @@ class EthereumTxConsumerConfiguration { } private fun consumerConfigs(): MutableMap = defaultConsumerConfig().with( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.GROUP_ID_CONFIG to "ethereum-contract-summary-update-process", - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, - ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.GROUP_ID_CONFIG to "ethereum-contract-summary-update-process", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, + ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(), + ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG ) }