From 3b698c728842de98fd14803e5a3447f14eb5ad30 Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Tue, 27 Mar 2018 17:08:53 +0300 Subject: [PATCH] #73 Add detekt static analytic for source code. Fix detekt issues --- .../bitcoin/BitcoinAddressSummaryStorage.kt | 17 +++--- .../apply/BitcoinTxConsumerConfiguration.kt | 9 ++-- .../summary/BitcoinAddressSummaryDelta.kt | 6 +-- .../address/common/CommonConfiguration.kt | 2 +- .../common/delta/AddressSummaryDelta.kt | 2 +- .../delta/apply/AddressLockException.kt | 2 +- .../apply/UpdateAddressSummaryProcess.kt | 54 +++++++++++-------- .../common/summary/AddressSummaryStorage.kt | 2 +- ...UpdateEthereumAddressSummaryApplication.kt | 2 +- .../ethereum/EthereumAddressSummaryStorage.kt | 17 +++--- .../apply/EthereumConsumerConfiguration.kt | 6 ++- .../summary/EthereumAddressSummaryDelta.kt | 24 +++++---- .../BitcoinRepositoryConfiguration.kt | 15 ++++-- .../cyber/cassandra/bitcoin/model/Address.kt | 2 +- .../cyber/cassandra/bitcoin/model/Block.kt | 13 +++-- .../cassandra/bitcoin/model/Transaction.kt | 5 +- .../bitcoin/repository/TxRepositories.kt | 2 +- .../UpdateAddressSummaryRepository.kt | 13 +++-- .../cassandra/common/CqlAddressSummary.kt | 2 +- .../cassandra/common/NoChainCondition.kt | 2 +- .../configuration/CassandraConfiguration.kt | 12 +++-- .../EthereumRepositoryConfiguration.kt | 34 ++++++++---- .../cyber/cassandra/ethereum/model/Address.kt | 8 +-- .../cyber/cassandra/ethereum/model/Block.kt | 8 +-- .../repository/AddressRepositories.kt | 7 ++- .../ethereum/repository/BlockRepositories.kt | 2 +- .../ethereum/repository/TxRepositories.kt | 2 +- .../ethereum/repository/UncleRepositories.kt | 2 +- .../UpdateAddressSummaryRepository.kt | 12 +++-- .../ElassandraSchemaMigrationEngine.kt | 38 +++++++------ .../migration/ElasticHttpMigration.kt | 2 +- .../cassandra/migration/MigrationSettings.kt | 2 +- .../cassandra/migration/MigrationsLoader.kt | 6 ++- .../MigrationRepositoryConfiguration.kt | 8 ++- .../cyber/cassandra/migration/model/Schema.kt | 2 +- .../repository/SchemaRepositories.kt | 9 ++-- .../SinglePartitionTopicLastItemsReader.kt | 8 +-- .../kafka/BaseForKafkaIntegrationTest.kt | 2 +- .../fund/cyber/common/kafka/ProducerUtils.kt | 2 +- .../SinglePartitionTopicDataPresentLatch.kt | 2 +- .../SinglePartitionLackOfRecordsReaderTest.kt | 2 +- ...tionMultipleTransactionRecordReaderTest.kt | 2 +- .../SinglePartitionNonRecordsReaderTest.kt | 2 +- ...rtitionSingeTransactionRecordReaderTest.kt | 2 +- .../kotlin/fund/cyber/common/Concurrency.kt | 2 +- .../src/main/kotlin/fund/cyber/common/IO.kt | 2 +- .../src/main/kotlin/fund/cyber/common/Math.kt | 5 +- .../kotlin/fund/cyber/common/StackCache.kt | 2 +- .../fund/cyber/search/configuration/Env.kt | 2 +- .../kotlin/fund/cyber/search/model/JsonRpc.kt | 2 +- .../cyber/search/model/bitcoin/Address.kt | 2 +- .../search/model/bitcoin/BitcoinJsonRpc.kt | 2 +- .../fund/cyber/search/model/bitcoin/Block.kt | 2 +- .../cyber/search/model/bitcoin/Transaction.kt | 5 +- .../fund/cyber/search/model/chains/Bitcoin.kt | 2 +- .../cyber/search/model/chains/Ethereum.kt | 2 +- .../fund/cyber/search/model/ethereum/Block.kt | 9 ++-- .../search/model/ethereum/Transaction.kt | 2 +- .../fund/cyber/search/model/ethereum/Uncle.kt | 8 +-- .../cyber/search/model/events/PumpEvents.kt | 2 +- .../fund/cyber/search/model/events/Topics.kt | 2 +- .../fund/cyber/node/common/StackCacheTest.kt | 2 +- detekt.yml | 10 ++-- .../main/kotlin/fund/cyber/DumpApplication.kt | 2 +- .../dump/bitcoin/ApplicationConfiguration.kt | 5 +- .../cyber/dump/bitcoin/BlockDumpProcess.kt | 2 +- .../main/kotlin/fund/cyber/DumpApplication.kt | 2 +- .../dump/ethereum/ApplcationConfiguration.kt | 30 +++++++---- .../cyber/dump/ethereum/BlockDumpProcess.kt | 2 +- .../fund/cyber/dump/ethereum/TxDumpProcess.kt | 10 ++-- .../cyber/dump/ethereum/UncleDumpProcess.kt | 8 +-- .../fund/cyber/pump/BlockchainInterface.kt | 2 +- .../kotlin/fund/cyber/pump/PumpApplication.kt | 2 +- .../client/BitcoinBlockchainInterface.kt | 2 +- .../client/BitcoinClientConfiguration.kt | 9 ++-- .../bitcoin/client/BitcoinJsonRpcClient.kt | 2 +- .../JsonRpcToDaoBitcoinBlockConverter.kt | 10 ++-- .../client/JsonRpcToDaoBitcoinTxConverter.kt | 11 +++- .../genesis/BitcoinGenesisBundleProvider.kt | 2 +- .../bitcoin/kafka/BitcoinBundleProducer.kt | 2 +- .../BitcoinBundleProducerConfiguration.kt | 9 ++-- .../kafka/LastPumpedBitcoinBundlesProvider.kt | 2 +- .../BtcdToDaoTransactionConverterTest.kt | 2 +- .../fund/cyber/pump/common/ChainPump.kt | 5 +- .../cyber/pump/common/CommonConfiguration.kt | 8 +-- .../common/kafka/KafkaBlockBundleProducer.kt | 2 +- .../common/kafka/LastPumpedBlocksProvider.kt | 2 +- .../LastNetworkBlockNumberMonitoring.kt | 12 +++-- .../pump/common/node/BlockchainInterface.kt | 2 +- .../node/FlowableBlockchainInterface.kt | 10 ++-- .../main/kotlin/fund/cyber/PumpApplication.kt | 2 +- .../client/EthereumBlockchainInterface.kt | 6 ++- .../client/EthereumClientConfiguration.kt | 9 ++-- .../client/ParityToEthereumBundleConverter.kt | 11 +++- .../genesis/EthereumGenesisBundleProvider.kt | 2 +- .../kafka/EthereumBlockBundleProducer.kt | 2 +- .../EthereumBundleProducerConfiguration.kt | 9 ++-- .../kafka/LastPumpedEthereumBundleProvider.kt | 2 +- .../monitoring/LastTopicOffsetMonitoring.kt | 10 +++- .../kotlin/fund/cyber/SearchApiApplication.kt | 2 +- .../cyber/api/common/CommonConfiguration.kt | 14 +++-- .../fund/cyber/api/common/MetricsWebFilter.kt | 5 +- .../fund/cyber/api/common/PingController.kt | 2 +- .../cyber/api/ethereum/AddressHandlers.kt | 5 +- .../fund/cyber/api/ethereum/BlockHandlers.kt | 5 +- .../fund/cyber/api/ethereum/TxHandlers.kt | 2 +- .../ethereum/functions/AddressTxesByAddres.kt | 2 +- .../functions/BlockTxesByBlockNumber.kt | 2 +- .../fund/cyber/api/search/SearchController.kt | 2 +- .../fund/cyber/api/search/SearchModel.kt | 2 +- 110 files changed, 432 insertions(+), 259 deletions(-) diff --git a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/BitcoinAddressSummaryStorage.kt b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/BitcoinAddressSummaryStorage.kt index 59e3d99d..ce7789ef 100644 --- a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/BitcoinAddressSummaryStorage.kt +++ b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/BitcoinAddressSummaryStorage.kt @@ -14,15 +14,20 @@ class BitcoinAddressSummaryStorage( override fun findById(id: String): Mono = addressSummaryRepository.findById(id) - override fun findAllByIdIn(ids: Iterable): Flux = addressSummaryRepository.findAllByIdIn(ids) + override fun findAllByIdIn(ids: Iterable): Flux = addressSummaryRepository + .findAllByIdIn(ids) - override fun update(summary: CqlBitcoinAddressSummary, oldVersion: Long): Mono = addressSummaryRepository.update(summary, oldVersion) + override fun update(summary: CqlBitcoinAddressSummary, oldVersion: Long): Mono = addressSummaryRepository + .update(summary, oldVersion) - override fun insertIfNotRecord(summary: CqlBitcoinAddressSummary): Mono = addressSummaryRepository.insertIfNotRecord(summary) + override fun insertIfNotRecord(summary: CqlBitcoinAddressSummary): Mono = addressSummaryRepository + .insertIfNotRecord(summary) - override fun commitUpdate(address: String, newVersion: Long): Mono = addressSummaryRepository.commitUpdate(address, newVersion) + override fun commitUpdate(address: String, newVersion: Long): Mono = addressSummaryRepository + .commitUpdate(address, newVersion) - override fun update(summary: CqlBitcoinAddressSummary): Mono = addressSummaryRepository.save(summary) + override fun update(summary: CqlBitcoinAddressSummary): Mono = addressSummaryRepository + .save(summary) override fun remove(address: String): Mono = addressSummaryRepository.deleteById(address) -} \ No newline at end of file +} diff --git a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt index 67e4b4e1..cdb866c0 100644 --- a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt +++ b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/delta/apply/BitcoinTxConsumerConfiguration.kt @@ -20,11 +20,14 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.listener.* import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.BATCH +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler import org.springframework.kafka.listener.config.ContainerProperties import org.springframework.transaction.annotation.EnableTransactionManagement +private const val MAX_POLL_RECORDS_CONFIG = 10 + @EnableKafka @Configuration @EnableTransactionManagement @@ -74,6 +77,6 @@ class BitcoinTxConsumerConfiguration { ConsumerConfig.GROUP_ID_CONFIG to "bitcoin-address-summary-update-process", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 10 + ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG ) -} \ No newline at end of file +} diff --git a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/summary/BitcoinAddressSummaryDelta.kt b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/summary/BitcoinAddressSummaryDelta.kt index 489d507c..e3192fbc 100644 --- a/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/summary/BitcoinAddressSummaryDelta.kt +++ b/address-summary/bitcoin/src/main/kotlin/fund/cyber/address/bitcoin/summary/BitcoinAddressSummaryDelta.kt @@ -104,10 +104,10 @@ class BitcoinDeltaMerger: DeltaMerger { val existingSummary = currentAddresses[first.address] - // todo: what if deltas to apply is empty? Case: we didn't commit offset range and dropped. Then restored with the same offset range val deltasToApply = deltas.filterNot { delta -> existingSummary != null && existingSummary.kafkaDeltaTopic == delta.topic - && existingSummary.kafkaDeltaPartition == delta.partition && delta.offset <= existingSummary.kafkaDeltaOffset + && existingSummary.kafkaDeltaPartition == delta.partition + && delta.offset <= existingSummary.kafkaDeltaOffset } val balance = deltasToApply.sumByDecimal { delta -> delta.balanceDelta } val totalReceived = deltasToApply.sumByDecimal { delta -> delta.totalReceivedDelta } @@ -119,4 +119,4 @@ class BitcoinDeltaMerger: DeltaMerger { offset = deltasToApply.maxBy { it -> it.offset }!!.offset ) } -} \ No newline at end of file +} diff --git a/address-summary/common/src/main/kotlin/fund/cyber/address/common/CommonConfiguration.kt b/address-summary/common/src/main/kotlin/fund/cyber/address/common/CommonConfiguration.kt index 6777435b..3de60ff5 100644 --- a/address-summary/common/src/main/kotlin/fund/cyber/address/common/CommonConfiguration.kt +++ b/address-summary/common/src/main/kotlin/fund/cyber/address/common/CommonConfiguration.kt @@ -19,4 +19,4 @@ class CommonConfiguration { fun metricsCommonTags(): MeterRegistryCustomizer { return MeterRegistryCustomizer { registry -> registry.config().commonTags("chain", chain.name) } } -} \ No newline at end of file +} diff --git a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/AddressSummaryDelta.kt b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/AddressSummaryDelta.kt index c034d972..9ecc9b93 100644 --- a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/AddressSummaryDelta.kt +++ b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/AddressSummaryDelta.kt @@ -23,4 +23,4 @@ interface DeltaProcessor> //todo this class should not be aware of kafka records interface DeltaMerger> { fun mergeDeltas(deltas: Iterable, currentAddresses: Map): D? -} \ No newline at end of file +} diff --git a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/AddressLockException.kt b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/AddressLockException.kt index c7c37330..6265b5f9 100644 --- a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/AddressLockException.kt +++ b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/AddressLockException.kt @@ -1,3 +1,3 @@ package fund.cyber.address.common.delta.apply -class AddressLockException: Exception() \ No newline at end of file +class AddressLockException: Exception() diff --git a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/UpdateAddressSummaryProcess.kt b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/UpdateAddressSummaryProcess.kt index f6922bc9..0f06e8fc 100644 --- a/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/UpdateAddressSummaryProcess.kt +++ b/address-summary/common/src/main/kotlin/fund/cyber/address/common/delta/apply/UpdateAddressSummaryProcess.kt @@ -30,9 +30,26 @@ fun Flux.await(): List { return this.collectList().block()!! } +fun CqlAddressSummary.hasSameTopicPartitionAs(delta: AddressSummaryDelta<*>) = + this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition + +fun CqlAddressSummary.hasSameTopicPartitionAs(topic: String, partition: Int) = + this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition + +fun CqlAddressSummary.notSameTopicPartitionAs(delta: AddressSummaryDelta<*>) = + hasSameTopicPartitionAs(delta).not() + +fun CqlAddressSummary.committed() = this.kafkaDeltaOffsetCommitted + +fun CqlAddressSummary.notCommitted() = committed().not() + +@Suppress("MagicNumber") private val applicationContext = newFixedThreadPoolContext(8, "Coroutines Concurrent Pool") private val log = LoggerFactory.getLogger(UpdateAddressSummaryProcess::class.java)!! +private const val MAX_STORE_ATTEMPTS = 20 +private const val STORE_RETRY_TIMEOUT = 30L + data class UpdateInfo( val topic: String, val partition: Int, @@ -92,14 +109,18 @@ class UpdateAddressSummaryProcess - async(applicationContext) { store(addressesSummary[delta.address], delta, storeAttempts, previousStates) } + async(applicationContext) { + store(addressesSummary[delta.address], delta, storeAttempts, previousStates) + } }.map { it.await() } } } commitKafkaTimer.recordCallable { consumer.commitSync() } - val newSummaries = downloadCassandraTimer.recordCallable { addressSummaryStorage.findAllByIdIn(addresses).await() } + val newSummaries = downloadCassandraTimer.recordCallable { + addressSummaryStorage.findAllByIdIn(addresses).await() + } commitCassandraTimer.recordCallable { runBlocking { @@ -139,7 +160,9 @@ class UpdateAddressSummaryProcess, previousStates: MutableMap) { + @Suppress("ComplexMethod", "NestedBlockDepth") + private suspend fun store(summary: S?, delta: D, storeAttempts: MutableMap, + previousStates: MutableMap) { previousStates[delta.address] = summary if (summary != null) { @@ -155,7 +178,7 @@ class UpdateAddressSummaryProcess 20) { + if (storeAttempts[delta.address] ?: 0 > MAX_STORE_ATTEMPTS) { if (summary.currentTopicPartitionWentFurther()) { val result = delta.applyTo(summary) if (!result) { @@ -166,7 +189,7 @@ class UpdateAddressSummaryProcess this.kafkaDeltaOffset//todo : = or >= ???? - private fun initMonitors(info: UpdateInfo) { val tags = Tags.of("topic", info.topic) if (!(::applyLockMonitor.isInitialized)) { @@ -225,6 +232,9 @@ class UpdateAddressSummaryProcess this.kafkaDeltaOffset//todo : = or >= ???? + private fun lastOffsetOf(topic: String, partition: Int): Long { val reader = SinglePartitionTopicLastItemsReader( @@ -241,4 +251,4 @@ class UpdateAddressSummaryProcess { fun commitUpdate(address: String, newVersion: Long): Mono fun update(summary: S): Mono fun remove(address: String): Mono -} \ No newline at end of file +} diff --git a/address-summary/ethereum/src/main/kotlin/fund/cyber/UpdateEthereumAddressSummaryApplication.kt b/address-summary/ethereum/src/main/kotlin/fund/cyber/UpdateEthereumAddressSummaryApplication.kt index 41bfb1ec..19b1ed1f 100644 --- a/address-summary/ethereum/src/main/kotlin/fund/cyber/UpdateEthereumAddressSummaryApplication.kt +++ b/address-summary/ethereum/src/main/kotlin/fund/cyber/UpdateEthereumAddressSummaryApplication.kt @@ -25,4 +25,4 @@ class UpdateEthereumAddressSummaryApplication { SpringApplication.run(UpdateEthereumAddressSummaryApplication::class.java, *args) } } -} \ No newline at end of file +} diff --git a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/EthereumAddressSummaryStorage.kt b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/EthereumAddressSummaryStorage.kt index d71e2765..2fde614f 100644 --- a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/EthereumAddressSummaryStorage.kt +++ b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/EthereumAddressSummaryStorage.kt @@ -14,15 +14,20 @@ class EthereumAddressSummaryStorage( override fun findById(id: String): Mono = addressSummaryRepository.findById(id) - override fun findAllByIdIn(ids: Iterable): Flux = addressSummaryRepository.findAllById(ids) + override fun findAllByIdIn(ids: Iterable): Flux = addressSummaryRepository + .findAllById(ids) - override fun update(summary: CqlEthereumAddressSummary, oldVersion: Long): Mono = addressSummaryRepository.update(summary, oldVersion) + override fun update(summary: CqlEthereumAddressSummary, oldVersion: Long): Mono = addressSummaryRepository + .update(summary, oldVersion) - override fun insertIfNotRecord(summary: CqlEthereumAddressSummary): Mono = addressSummaryRepository.insertIfNotRecord(summary) + override fun insertIfNotRecord(summary: CqlEthereumAddressSummary): Mono = addressSummaryRepository + .insertIfNotRecord(summary) - override fun commitUpdate(address: String, newVersion: Long): Mono = addressSummaryRepository.commitUpdate(address, newVersion) + override fun commitUpdate(address: String, newVersion: Long): Mono = addressSummaryRepository + .commitUpdate(address, newVersion) - override fun update(summary: CqlEthereumAddressSummary): Mono = addressSummaryRepository.save(summary) + override fun update(summary: CqlEthereumAddressSummary): Mono = addressSummaryRepository + .save(summary) override fun remove(address: String): Mono = addressSummaryRepository.deleteById(address) -} \ No newline at end of file +} diff --git a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/delta/apply/EthereumConsumerConfiguration.kt b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/delta/apply/EthereumConsumerConfiguration.kt index b6c4dae4..bc140c43 100644 --- a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/delta/apply/EthereumConsumerConfiguration.kt +++ b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/delta/apply/EthereumConsumerConfiguration.kt @@ -32,6 +32,8 @@ import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler import org.springframework.kafka.listener.config.ContainerProperties import org.springframework.transaction.annotation.EnableTransactionManagement +private const val MAX_POLL_RECORDS_CONFIG = 500 + @EnableKafka @Configuration @EnableTransactionManagement @@ -129,6 +131,6 @@ class EthereumTxConsumerConfiguration { ConsumerConfig.GROUP_ID_CONFIG to "ethereum-address-summary-update-process", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500 + ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG ) -} \ No newline at end of file +} diff --git a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/summary/EthereumAddressSummaryDelta.kt b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/summary/EthereumAddressSummaryDelta.kt index 469a37cf..352bd85a 100644 --- a/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/summary/EthereumAddressSummaryDelta.kt +++ b/address-summary/ethereum/src/main/kotlin/fund/cyber/address/ethereum/summary/EthereumAddressSummaryDelta.kt @@ -48,9 +48,11 @@ data class EthereumAddressSummaryDelta( override fun updateSummary(summary: CqlEthereumAddressSummary): CqlEthereumAddressSummary { return CqlEthereumAddressSummary( - id = summary.id, confirmedBalance = summary.confirmedBalance + this.balanceDelta, contractAddress = summary.contractAddress, + id = summary.id, confirmedBalance = summary.confirmedBalance + this.balanceDelta, + contractAddress = summary.contractAddress, confirmedTotalReceived = summary.confirmedTotalReceived + this.totalReceivedDelta, - txNumber = summary.txNumber + this.txNumberDelta, minedUncleNumber = summary.minedUncleNumber + this.uncleNumberDelta, + txNumber = summary.txNumber + this.txNumberDelta, + minedUncleNumber = summary.minedUncleNumber + this.uncleNumberDelta, minedBlockNumber = summary.minedBlockNumber + this.minedBlockNumberDelta, kafkaDeltaOffset = this.offset, kafkaDeltaTopic = this.topic, kafkaDeltaPartition = this.partition, version = summary.version + 1 @@ -74,8 +76,8 @@ class EthereumTxDeltaProcessor : DeltaProcessor { +class EthereumBlockDeltaProcessor + : DeltaProcessor { override fun recordToDeltas(record: ConsumerRecord): List { @@ -121,7 +124,8 @@ class EthereumBlockDeltaProcessor : DeltaProcessor { +class EthereumUncleDeltaProcessor + : DeltaProcessor { override fun recordToDeltas(record: ConsumerRecord): List { @@ -154,7 +158,8 @@ class EthereumDeltaMerger : DeltaMerger { val deltasToApply = deltas.filterNot { delta -> existingSummary != null && existingSummary.kafkaDeltaTopic == delta.topic - && existingSummary.kafkaDeltaPartition == delta.partition && delta.offset <= existingSummary.kafkaDeltaOffset + && existingSummary.kafkaDeltaPartition == delta.partition + && delta.offset <= existingSummary.kafkaDeltaOffset } val balance = deltasToApply.sumByDecimal { delta -> delta.balanceDelta } val totalReceived = deltasToApply.sumByDecimal { delta -> delta.totalReceivedDelta } @@ -166,7 +171,8 @@ class EthereumDeltaMerger : DeltaMerger { address = first.address, balanceDelta = balance, totalReceivedDelta = totalReceived, txNumberDelta = txNumber, minedBlockNumberDelta = blockNumber, uncleNumberDelta = uncleNumber, contractAddress = deltasToApply.any { delta -> delta.contractAddress ?: false }, - topic = first.topic, partition = first.partition, offset = deltasToApply.maxBy { it -> it.offset }!!.offset + topic = first.topic, partition = first.partition, + offset = deltasToApply.maxBy { it -> it.offset }!!.offset ) } -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt index 0687ceae..4f9d8dd1 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt @@ -4,10 +4,19 @@ import fund.cyber.cassandra.configuration.CassandraRepositoriesConfiguration import fund.cyber.cassandra.configuration.keyspace import fund.cyber.cassandra.migration.BlockchainMigrationSettings import fund.cyber.cassandra.migration.MigrationSettings -import fund.cyber.search.configuration.* +import fund.cyber.search.configuration.CASSANDRA_HOSTS +import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT +import fund.cyber.search.configuration.CASSANDRA_PORT +import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT +import fund.cyber.search.configuration.CHAIN +import fund.cyber.search.configuration.env import fund.cyber.search.model.chains.BitcoinFamilyChain import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.* +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Condition +import org.springframework.context.annotation.ConditionContext +import org.springframework.context.annotation.Conditional +import org.springframework.context.annotation.Configuration import org.springframework.core.type.AnnotatedTypeMetadata import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories @@ -41,4 +50,4 @@ private class BitcoinFamilyChainCondition : Condition { val chain = context.environment.getProperty(CHAIN) ?: "" return BitcoinFamilyChain.values().map(BitcoinFamilyChain::name).contains(chain) } -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Address.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Address.kt index 9a5b25f6..42fcfd1e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Address.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Address.kt @@ -35,4 +35,4 @@ data class CqlBitcoinAddressTx( val block_number: Long, val ins: List, val outs: List -) : CqlBitcoinItem \ No newline at end of file +) : CqlBitcoinItem diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt index da0a7694..1fba8702 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt @@ -4,7 +4,10 @@ package fund.cyber.cassandra.bitcoin.model import fund.cyber.search.model.bitcoin.BitcoinBlock import org.springframework.data.cassandra.core.cql.PrimaryKeyType -import org.springframework.data.cassandra.core.mapping.* +import org.springframework.data.cassandra.core.mapping.PrimaryKey +import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn +import org.springframework.data.cassandra.core.mapping.Table +import org.springframework.data.cassandra.core.mapping.UserDefinedType import java.math.BigDecimal import java.math.BigInteger import java.time.Instant @@ -21,7 +24,8 @@ data class CqlBitcoinTxPreviewIO( @Table("tx_preview_by_block") data class CqlBitcoinBlockTx( @PrimaryKeyColumn(name = "block_number", ordinal = 0, type = PrimaryKeyType.PARTITIONED) val blockNumber: Long, - @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED) val index: Int, //specify tx number(order) in block + //specify tx number(order) in block + @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED) val index: Int, val hash: String, val fee: BigDecimal, val ins: List, @@ -47,6 +51,7 @@ data class CqlBitcoinBlock( constructor(block: BitcoinBlock) : this( height = block.height, hash = block.hash, time = block.time, nonce = block.nonce, bits = block.bits, merkleroot = block.merkleroot, size = block.size, version = block.version, weight = block.weight, - difficulty = block.difficulty, tx_number = block.txNumber, total_outputs_value = block.totalOutputsAmount.toString() + difficulty = block.difficulty, tx_number = block.txNumber, + total_outputs_value = block.totalOutputsAmount.toString() ) -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt index 0a66514f..f2276eb4 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt @@ -25,7 +25,8 @@ data class CqlBitcoinTx( fun getOutputByNumber(number: Int) = outs.find { out -> out.out == number }!! - fun allAddressesUsedInTransaction() = ins.flatMap { input -> input.addresses } + outs.flatMap { output -> output.addresses } + fun allAddressesUsedInTransaction() = ins.flatMap { input -> input.addresses } + + outs.flatMap { output -> output.addresses } } @UserDefinedType("tx_in") @@ -44,4 +45,4 @@ data class CqlBitcoinTxOut( val asm: String, val out: Int, val required_signatures: Int -) \ No newline at end of file +) diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/TxRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/TxRepositories.kt index 38d1eb1f..32dc3c4b 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/TxRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/TxRepositories.kt @@ -20,4 +20,4 @@ interface BitcoinAddressTxRepository : ReactiveCrudRepository { fun findAllByBlockNumber(blockNumber: Long, options: QueryOptions = QueryOptions.empty()): Flux -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateAddressSummaryRepository.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateAddressSummaryRepository.kt index 1b3be912..9ec05b53 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateAddressSummaryRepository.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/UpdateAddressSummaryRepository.kt @@ -38,7 +38,8 @@ interface BitcoinUpdateAddressSummaryRepository : ReactiveCrudRepository + fun update(@Param("summary") summary: CqlBitcoinAddressSummary, + @Param("oldVersion") oldVersion: Long): Mono /** * Return {@code true} if there is no record for key and insert was successful. @@ -48,9 +49,11 @@ interface BitcoinUpdateAddressSummaryRepository : ReactiveCrudRepository @@ -63,4 +66,4 @@ interface BitcoinUpdateAddressSummaryRepository : ReactiveCrudRepository -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/CqlAddressSummary.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/CqlAddressSummary.kt index 24faffc1..cb3c4281 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/CqlAddressSummary.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/CqlAddressSummary.kt @@ -8,4 +8,4 @@ interface CqlAddressSummary { val kafkaDeltaPartition: Int val kafkaDeltaTopic: String val kafkaDeltaOffsetCommitted: Boolean -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/NoChainCondition.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/NoChainCondition.kt index 870037ed..a2918320 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/NoChainCondition.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/common/NoChainCondition.kt @@ -10,4 +10,4 @@ class NoChainCondition : Condition { override fun matches(context: ConditionContext, metadata: AnnotatedTypeMetadata): Boolean { return context.environment.getProperty(CHAIN) == null } -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/configuration/CassandraConfiguration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/configuration/CassandraConfiguration.kt index f1241a93..5d6c651c 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/configuration/CassandraConfiguration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/configuration/CassandraConfiguration.kt @@ -16,6 +16,8 @@ import org.springframework.data.cassandra.config.AbstractReactiveCassandraConfig const val MAX_CONCURRENT_REQUESTS = 8182 +const val MAX_PER_ROUTE = 16 +const val MAX_TOTAL = 32 val Chain.keyspace: String get() = lowerCaseName @@ -43,8 +45,8 @@ abstract class CassandraRepositoriesConfiguration( class CassandraConfiguration { private val defaultHttpHeaders = listOf(BasicHeader("Keep-Alive", "timeout=10, max=1024")) private val connectionManager = PoolingHttpClientConnectionManager().apply { - defaultMaxPerRoute = 16 - maxTotal = 32 + defaultMaxPerRoute = MAX_PER_ROUTE + maxTotal = MAX_TOTAL } @Bean @@ -55,5 +57,7 @@ class CassandraConfiguration { .build()!! @Bean - fun migrationsLoader(resourceLoader: GenericApplicationContext) = DefaultMigrationsLoader(resourceLoader = resourceLoader) -} \ No newline at end of file + fun migrationsLoader(resourceLoader: GenericApplicationContext) = DefaultMigrationsLoader( + resourceLoader = resourceLoader + ) +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/configuration/EthereumRepositoryConfiguration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/configuration/EthereumRepositoryConfiguration.kt index e472f8c7..39e353b9 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/configuration/EthereumRepositoryConfiguration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/configuration/EthereumRepositoryConfiguration.kt @@ -4,17 +4,30 @@ import com.datastax.driver.core.Cluster import fund.cyber.cassandra.common.NoChainCondition import fund.cyber.cassandra.configuration.CassandraRepositoriesConfiguration import fund.cyber.cassandra.configuration.keyspace -import fund.cyber.cassandra.ethereum.repository.* +import fund.cyber.cassandra.ethereum.repository.EthereumAddressRepository +import fund.cyber.cassandra.ethereum.repository.EthereumBlockRepository +import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository +import fund.cyber.cassandra.ethereum.repository.PageableEthereumAddressTxRepository +import fund.cyber.cassandra.ethereum.repository.PageableEthereumBlockTxRepository import fund.cyber.cassandra.migration.BlockchainMigrationSettings import fund.cyber.cassandra.migration.MigrationSettings -import fund.cyber.search.configuration.* +import fund.cyber.search.configuration.CASSANDRA_HOSTS +import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT +import fund.cyber.search.configuration.CASSANDRA_PORT +import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT +import fund.cyber.search.configuration.CHAIN +import fund.cyber.search.configuration.env import fund.cyber.search.model.chains.Chain import fund.cyber.search.model.chains.EthereumFamilyChain import org.springframework.beans.factory.InitializingBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.* +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Condition +import org.springframework.context.annotation.ConditionContext +import org.springframework.context.annotation.Conditional +import org.springframework.context.annotation.Configuration import org.springframework.context.support.GenericApplicationContext import org.springframework.core.type.AnnotatedTypeMetadata import org.springframework.data.cassandra.ReactiveSession @@ -154,10 +167,11 @@ class EthereumRepositoriesConfiguration : InitializingBean { } - private fun getKeyspaceSession(chain: Chain, converter: MappingCassandraConverter) = CassandraSessionFactoryBean().apply { - setCluster(cluster) - setConverter(converter) - setKeyspaceName(chain.keyspace) - schemaAction = SchemaAction.NONE - } -} \ No newline at end of file + private fun getKeyspaceSession(chain: Chain, converter: MappingCassandraConverter) = CassandraSessionFactoryBean() + .apply { + setCluster(cluster) + setConverter(converter) + setKeyspaceName(chain.keyspace) + schemaAction = SchemaAction.NONE + } +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Address.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Address.kt index a92d688a..0164fd3b 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Address.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Address.kt @@ -34,8 +34,10 @@ data class CqlEthereumAddressSummary( @Table("tx_preview_by_address") data class CqlEthereumAddressTxPreview( - @PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED) val address: String, - @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED, ordering = DESCENDING, name = "block_time") val blockTime: Instant, + @PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED) + val address: String, + @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED, ordering = DESCENDING, name = "block_time") + val blockTime: Instant, @PrimaryKeyColumn(ordinal = 2, type = PrimaryKeyType.CLUSTERED) val hash: String, val fee: BigDecimal, @Column(forceQuote = true) val from: String, @@ -88,4 +90,4 @@ data class CqlEthereumAddressMinedUncle( blockNumber = uncle.blockNumber, blockTime = uncle.blockTime, blockHash = uncle.blockHash, miner = uncle.miner, uncleReward = uncle.uncleReward.toString() ) -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Block.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Block.kt index 7af1946d..a2f95a4e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Block.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/model/Block.kt @@ -53,8 +53,10 @@ data class CqlEthereumBlock( @Table("tx_preview_by_block") data class CqlEthereumBlockTxPreview( - @PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED, value = "block_number") val blockNumber: Long, - @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED, value = "position_in_block") val positionInBlock: Int, + @PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED, value = "block_number") + val blockNumber: Long, + @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED, value = "position_in_block") + val positionInBlock: Int, val fee: BigDecimal, val value: BigDecimal, val hash: String, @@ -70,4 +72,4 @@ data class CqlEthereumBlockTxPreview( from = tx.from, to = (tx.to ?: tx.createdContract)!!, createsContract = tx.createdContract != null ) -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/AddressRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/AddressRepositories.kt index 52965dde..ea61f3ad 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/AddressRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/AddressRepositories.kt @@ -1,6 +1,9 @@ package fund.cyber.cassandra.ethereum.repository -import fund.cyber.cassandra.ethereum.model.* +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressMinedBlock +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressMinedUncle +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressSummary +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressTxPreview import org.springframework.data.cassandra.core.mapping.MapId import org.springframework.data.cassandra.repository.CassandraRepository import org.springframework.data.domain.Pageable @@ -18,4 +21,4 @@ interface EthereumAddressUncleRepository : ReactiveCrudRepository { fun findAllByAddress(address: String, page: Pageable): Slice -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/BlockRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/BlockRepositories.kt index 176ef87a..4756e5fc 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/BlockRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/BlockRepositories.kt @@ -14,4 +14,4 @@ interface EthereumBlockTxRepository : ReactiveCassandraRepository { fun findAllByBlockNumber(blockNumber: Long, page: Pageable): Slice -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/TxRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/TxRepositories.kt index 4a33d900..b0c39846 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/TxRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/TxRepositories.kt @@ -3,4 +3,4 @@ package fund.cyber.cassandra.ethereum.repository import fund.cyber.cassandra.ethereum.model.CqlEthereumTx import org.springframework.data.cassandra.repository.ReactiveCassandraRepository -interface EthereumTxRepository : ReactiveCassandraRepository \ No newline at end of file +interface EthereumTxRepository : ReactiveCassandraRepository diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UncleRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UncleRepositories.kt index 0b796db9..49623152 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UncleRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UncleRepositories.kt @@ -3,4 +3,4 @@ package fund.cyber.cassandra.ethereum.repository import fund.cyber.cassandra.ethereum.model.CqlEthereumUncle import org.springframework.data.repository.reactive.ReactiveCrudRepository -interface EthereumUncleRepository : ReactiveCrudRepository \ No newline at end of file +interface EthereumUncleRepository : ReactiveCrudRepository diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateAddressSummaryRepository.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateAddressSummaryRepository.kt index 3e06572f..268132d8 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateAddressSummaryRepository.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/ethereum/repository/UpdateAddressSummaryRepository.kt @@ -42,7 +42,8 @@ interface EthereumUpdateAddressSummaryRepository : ReactiveCrudRepository + fun update(@Param("summary") summary: CqlEthereumAddressSummary, + @Param("oldVersion") oldVersion: Long): Mono /** * Return {@code true} if there is no record for key and insert was successful. @@ -53,9 +54,10 @@ interface EthereumUpdateAddressSummaryRepository : ReactiveCrudRepository -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElassandraSchemaMigrationEngine.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElassandraSchemaMigrationEngine.kt index 9f7bdbf5..2655351e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElassandraSchemaMigrationEngine.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElassandraSchemaMigrationEngine.kt @@ -4,7 +4,10 @@ import fund.cyber.cassandra.migration.configuration.MigrationRepositoryConfigura import fund.cyber.cassandra.migration.model.CqlSchemaVersion import fund.cyber.cassandra.migration.repository.SchemaVersionRepository import fund.cyber.common.readAsString -import fund.cyber.search.configuration.* +import fund.cyber.search.configuration.CASSANDRA_HOSTS +import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT +import fund.cyber.search.configuration.ELASTIC_HTTP_PORT +import fund.cyber.search.configuration.ELASTIC_HTTP_PORT_DEFAULT import org.apache.http.HttpStatus import org.apache.http.client.HttpClient import org.apache.http.client.methods.RequestBuilder @@ -47,21 +50,23 @@ class ElassandraSchemaMigrationEngine( log.info("Executing elassandra schema update") log.info("Found ${migrations.size} migrations") - migrations.filter { it !is EmptyMigration }.groupBy { m -> m.applicationId }.forEach { applicationId, applicationMigrations -> + migrations.filter { it !is EmptyMigration }.groupBy { m -> m.applicationId } + .forEach { applicationId, applicationMigrations -> - val executedMigrations = schemaVersionRepository.findAllByApplicationId(applicationId) - .map(CqlSchemaVersion::id).collectList().block() ?: emptyList() + val executedMigrations = schemaVersionRepository + .findAllByApplicationId(applicationId) + .map(CqlSchemaVersion::id).collectList().block() ?: emptyList() - applicationMigrations - .filter { migration -> !executedMigrations.contains(migration.id) } - .sortedBy { migration -> migration.id.substringBefore("_").toInt() } - .forEach { migration -> - log.info("Executing '$applicationId' application migration to '${migration.id}' id") - executeMigration(migration) - log.info("Succeeded '$applicationId' application migration to '${migration.id}' id") - } + applicationMigrations + .filter { migration -> !executedMigrations.contains(migration.id) } + .sortedBy { migration -> migration.id.substringBefore("_").toInt() } + .forEach { migration -> + log.info("Executing '$applicationId' application migration to '${migration.id}' id") + executeMigration(migration) + log.info("Succeeded '$applicationId' application migration to '${migration.id}' id") + } - } + } log.info("Elassandra schema update done") } @@ -70,7 +75,9 @@ class ElassandraSchemaMigrationEngine( private fun executeMigration(migration: Migration) { when (migration) { - is CassandraMigration -> migration.getStatements().forEach { statement -> cassandraTemplate.reactiveCqlOperations.execute(statement).block() } + is CassandraMigration -> migration.getStatements().forEach { statement -> + cassandraTemplate.reactiveCqlOperations.execute(statement).block() + } is ElasticHttpMigration -> { val requestWithRelativeUri = migration.getRequest() @@ -103,5 +110,4 @@ class ElassandraSchemaMigrationEngine( cassandraTemplate.reactiveCqlOperations.execute(createSchemaVersionCql).block() } - -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElasticHttpMigration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElasticHttpMigration.kt index 24a924ad..6217c91b 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElasticHttpMigration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/ElasticHttpMigration.kt @@ -40,4 +40,4 @@ class ElasticHttpMigration( .setEntity(ByteArrayEntity(migration.data.toString().toByteArray())) .build() } -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationSettings.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationSettings.kt index fa02d9e9..767e71fa 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationSettings.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationSettings.kt @@ -14,4 +14,4 @@ open class BlockchainMigrationSettings( ) : MigrationSettings( migrationDirectory = chain.lowerCaseName, applicationId = chain.lowerCaseName -) \ No newline at end of file +) diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationsLoader.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationsLoader.kt index 06616261..4706dd17 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationsLoader.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/MigrationsLoader.kt @@ -18,7 +18,9 @@ class DefaultMigrationsLoader( override fun load(settings: MigrationSettings): List { - return resourceLoader.getResources("classpath*:/$migrationsRootDirectory/${settings.migrationDirectory}/*.*").toList() + return resourceLoader + .getResources("classpath*:/$migrationsRootDirectory/${settings.migrationDirectory}/*.*") + .toList() .map { resource -> createMigration(resource, settings) } } @@ -35,4 +37,4 @@ class DefaultMigrationsLoader( else -> EmptyMigration() } } -} \ No newline at end of file +} diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/configuration/MigrationRepositoryConfiguration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/configuration/MigrationRepositoryConfiguration.kt index 0d4185be..98d12dcc 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/configuration/MigrationRepositoryConfiguration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/configuration/MigrationRepositoryConfiguration.kt @@ -1,10 +1,14 @@ package fund.cyber.cassandra.migration.configuration import fund.cyber.cassandra.configuration.CassandraRepositoriesConfiguration -import fund.cyber.search.configuration.* +import fund.cyber.search.configuration.CASSANDRA_HOSTS +import fund.cyber.search.configuration.CASSANDRA_PORT +import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT +import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.* +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration import org.springframework.data.cassandra.ReactiveSession import org.springframework.data.cassandra.config.CassandraSessionFactoryBean import org.springframework.data.cassandra.core.ReactiveCassandraOperations diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/model/Schema.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/model/Schema.kt index 86a8e4b3..61fb3c0f 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/model/Schema.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/model/Schema.kt @@ -11,4 +11,4 @@ data class CqlSchemaVersion( @PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED) val id: String, val migration_hash: Int, val apply_time: Date -) \ No newline at end of file +) diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/repository/SchemaRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/repository/SchemaRepositories.kt index d4879cd9..79e58eb9 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/repository/SchemaRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/migration/repository/SchemaRepositories.kt @@ -5,6 +5,9 @@ import org.springframework.data.cassandra.core.cql.QueryOptions import org.springframework.data.cassandra.repository.ReactiveCassandraRepository import reactor.core.publisher.Flux -interface SchemaVersionRepository: ReactiveCassandraRepository { - fun findAllByApplicationId(applicationId: String, options: QueryOptions = QueryOptions.empty()): Flux -} \ No newline at end of file +interface SchemaVersionRepository : ReactiveCassandraRepository { + + fun findAllByApplicationId( + applicationId: String, options: QueryOptions = QueryOptions.empty() + ): Flux +} diff --git a/common-kafka/src/main/kotlin/fund/cyber/common/kafka/reader/SinglePartitionTopicLastItemsReader.kt b/common-kafka/src/main/kotlin/fund/cyber/common/kafka/reader/SinglePartitionTopicLastItemsReader.kt index 841905e6..508d16a6 100644 --- a/common-kafka/src/main/kotlin/fund/cyber/common/kafka/reader/SinglePartitionTopicLastItemsReader.kt +++ b/common-kafka/src/main/kotlin/fund/cyber/common/kafka/reader/SinglePartitionTopicLastItemsReader.kt @@ -6,6 +6,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import java.util.* +const val CONSUMER_POLL_TIMEOUT = 100L +const val EMPTY_POLL_MAX_NUMBER = 5 class SinglePartitionTopicLastItemsReader( private val kafkaBrokers: String, @@ -47,9 +49,9 @@ class SinglePartitionTopicLastItemsReader( while (records.size != numberOfRecordsToRead && lastTopicItemOffset - readOffsetNumber >= 0) { consumer.seek(partition, lastTopicItemOffset - readOffsetNumber) - val consumerRecords = consumer.poll(100) + val consumerRecords = consumer.poll(CONSUMER_POLL_TIMEOUT) - if (consumerRecords.isEmpty && emptyPollNumber < 5) { + if (consumerRecords.isEmpty && emptyPollNumber < EMPTY_POLL_MAX_NUMBER) { emptyPollNumber++ continue } @@ -75,4 +77,4 @@ class SinglePartitionTopicLastItemsReader( consumer.seekToEnd(listOf(partition)) return consumer.position(partition) - 1 } -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/BaseForKafkaIntegrationTest.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/BaseForKafkaIntegrationTest.kt index 4ebfca1f..bfac3271 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/BaseForKafkaIntegrationTest.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/BaseForKafkaIntegrationTest.kt @@ -22,4 +22,4 @@ abstract class BaseForKafkaIntegrationTest { @Autowired lateinit var embeddedKafka: KafkaEmbedded -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/ProducerUtils.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/ProducerUtils.kt index 097ced06..2606c600 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/ProducerUtils.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/ProducerUtils.kt @@ -41,4 +41,4 @@ fun producerProperties(kafkaBrokers: String) = Properties().apply { put(ProducerConfig.BATCH_SIZE_CONFIG, 16384) put(ProducerConfig.LINGER_MS_CONFIG, 1) put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432) -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/SinglePartitionTopicDataPresentLatch.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/SinglePartitionTopicDataPresentLatch.kt index 34c46865..1f517a84 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/SinglePartitionTopicDataPresentLatch.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/SinglePartitionTopicDataPresentLatch.kt @@ -44,4 +44,4 @@ class SinglePartitionTopicDataPresentLatch( fun await() { countDownLatch.await() } -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionLackOfRecordsReaderTest.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionLackOfRecordsReaderTest.kt index 79a1a147..b2e454fb 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionLackOfRecordsReaderTest.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionLackOfRecordsReaderTest.kt @@ -48,4 +48,4 @@ class SinglePartitionLackOfRecordsReaderTest : BaseForKafkaIntegrationTest() { Assertions.assertEquals(4, records.size) (0 until itemsCount).forEach { Assertions.assertEquals(itemsCount - it - 1, records[it].second) } } -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionMultipleTransactionRecordReaderTest.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionMultipleTransactionRecordReaderTest.kt index 7d17bb5a..52f1cb5e 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionMultipleTransactionRecordReaderTest.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionMultipleTransactionRecordReaderTest.kt @@ -46,4 +46,4 @@ class SinglePartitionMultipleTransactionRecordReaderTest : BaseForKafkaIntegrati Assertions.assertEquals(4, records.size) (0 until itemsCount).forEach { Assertions.assertEquals(itemsCount - it - 1, records[it].second) } } -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionNonRecordsReaderTest.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionNonRecordsReaderTest.kt index 91a84d7f..3ab5945e 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionNonRecordsReaderTest.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionNonRecordsReaderTest.kt @@ -39,4 +39,4 @@ class SinglePartitionNonRecordsReaderTest : BaseForKafkaIntegrationTest() { val records = reader.readLastRecords(1) assertEquals(0, records.size) } -} \ No newline at end of file +} diff --git a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionSingeTransactionRecordReaderTest.kt b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionSingeTransactionRecordReaderTest.kt index 30101d4d..ec468eba 100644 --- a/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionSingeTransactionRecordReaderTest.kt +++ b/common-kafka/src/test/kotlin/fund/cyber/common/kafka/reader/SinglePartitionSingeTransactionRecordReaderTest.kt @@ -47,4 +47,4 @@ class SinglePartitionSingeTransactionRecordReaderTest : BaseForKafkaIntegrationT Assertions.assertEquals("key", records.first().first) Assertions.assertEquals(1, records.first().second) } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/common/Concurrency.kt b/common/src/main/kotlin/fund/cyber/common/Concurrency.kt index d386612d..28d389b0 100644 --- a/common/src/main/kotlin/fund/cyber/common/Concurrency.kt +++ b/common/src/main/kotlin/fund/cyber/common/Concurrency.kt @@ -6,4 +6,4 @@ fun List>.await(): List { val futures = this.toTypedArray() return CompletableFuture.allOf(*futures) .thenApply { futures.map { future -> future.get() } }.get() -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/common/IO.kt b/common/src/main/kotlin/fund/cyber/common/IO.kt index f1ef6292..da1a07b9 100644 --- a/common/src/main/kotlin/fund/cyber/common/IO.kt +++ b/common/src/main/kotlin/fund/cyber/common/IO.kt @@ -5,4 +5,4 @@ import java.nio.charset.Charset fun InputStream.readAsString(charset: Charset = Charsets.UTF_8): String { return this.reader(charset).use { it.readText() } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/common/Math.kt b/common/src/main/kotlin/fund/cyber/common/Math.kt index bc5cbe99..5108fde9 100644 --- a/common/src/main/kotlin/fund/cyber/common/Math.kt +++ b/common/src/main/kotlin/fund/cyber/common/Math.kt @@ -44,6 +44,7 @@ inline fun Iterable.sum(): BigDecimal { inline fun String.hexToLong(): Long = java.lang.Long.decode(this) -val decimal8 = BigDecimal(8) -val decimal32 = BigDecimal(32) +@Suppress("MagicNumber") val decimal8 = BigDecimal(8) +@Suppress("MagicNumber") val decimal32 = BigDecimal(32) +const val DECIMAL_SCALE = 18 diff --git a/common/src/main/kotlin/fund/cyber/common/StackCache.kt b/common/src/main/kotlin/fund/cyber/common/StackCache.kt index a92202ab..62b38cd5 100644 --- a/common/src/main/kotlin/fund/cyber/common/StackCache.kt +++ b/common/src/main/kotlin/fund/cyber/common/StackCache.kt @@ -29,4 +29,4 @@ class StackCache( _elements[--first] } } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/configuration/Env.kt b/common/src/main/kotlin/fund/cyber/search/configuration/Env.kt index 4d5d9813..7f1f5f52 100644 --- a/common/src/main/kotlin/fund/cyber/search/configuration/Env.kt +++ b/common/src/main/kotlin/fund/cyber/search/configuration/Env.kt @@ -40,4 +40,4 @@ inline fun env(name: String, default: T): T = Int::class -> (System.getenv(name)?.toIntOrNull() ?: default) as T Long::class -> (System.getenv(name)?.toLongOrNull() ?: default) as T else -> default - } \ No newline at end of file + } diff --git a/common/src/main/kotlin/fund/cyber/search/model/JsonRpc.kt b/common/src/main/kotlin/fund/cyber/search/model/JsonRpc.kt index aeaf817c..f8fa9a14 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/JsonRpc.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/JsonRpc.kt @@ -18,4 +18,4 @@ open class Response( data class Error( val code: Int, val message: String -) \ No newline at end of file +) diff --git a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Address.kt b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Address.kt index e0d5d95d..7e6b3b25 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Address.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Address.kt @@ -20,4 +20,4 @@ data class BitcoinAddressTx( val fee: BigDecimal, val ins: List, val outs: List -) : BitcoinItem \ No newline at end of file +) : BitcoinItem diff --git a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/BitcoinJsonRpc.kt b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/BitcoinJsonRpc.kt index c9f9811c..6b89475b 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/BitcoinJsonRpc.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/BitcoinJsonRpc.kt @@ -109,4 +109,4 @@ private class TransactionInputDeserializer : JsonDeserializer out.out == number }!! - fun allAddressesUsedInTransaction() = ins.flatMap { input -> input.addresses } + outs.flatMap { output -> output.addresses } + fun allAddressesUsedInTransaction() = ins.flatMap { input -> input.addresses } + + outs.flatMap { output -> output.addresses } } data class BitcoinTxIn( @@ -37,4 +38,4 @@ data class BitcoinTxOut( val asm: String, val out: Int, val requiredSignatures: Int -) \ No newline at end of file +) diff --git a/common/src/main/kotlin/fund/cyber/search/model/chains/Bitcoin.kt b/common/src/main/kotlin/fund/cyber/search/model/chains/Bitcoin.kt index 55cb8099..40d6bd49 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/chains/Bitcoin.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/chains/Bitcoin.kt @@ -11,4 +11,4 @@ enum class BitcoinFamilyChainEntity : ChainEntity { BLOCK, TRANSACTION, ADDRESS -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/chains/Ethereum.kt b/common/src/main/kotlin/fund/cyber/search/model/chains/Ethereum.kt index 914ffda1..c0de085e 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/chains/Ethereum.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/chains/Ethereum.kt @@ -3,4 +3,4 @@ package fund.cyber.search.model.chains enum class EthereumFamilyChain : Chain { ETHEREUM, ETHEREUM_CLASSIC -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Block.kt b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Block.kt index 7afe5364..11c6143b 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Block.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Block.kt @@ -5,6 +5,9 @@ import java.math.BigDecimal import java.math.BigInteger import java.time.Instant +const val ETHEREUM_CLASSIC_REWARD_CHANGED_BLOCK_NUMBER = 5000000 +const val ETHEREUM_REWARD_CHANGED_BLOCK_NUMBER = 4370000 + data class EthereumBlock( val number: Long, //parsed from hex val hash: String, @@ -33,8 +36,8 @@ data class EthereumBlock( //todo: add properly support of new classic fork fun getBlockReward(chain: EthereumFamilyChain, number: Long): BigDecimal { return if (chain == EthereumFamilyChain.ETHEREUM_CLASSIC) { - if (number < 5000000) BigDecimal("5") else BigDecimal("4") + if (number < ETHEREUM_CLASSIC_REWARD_CHANGED_BLOCK_NUMBER) BigDecimal("5") else BigDecimal("4") } else { - if (number < 4370000) BigDecimal("5") else BigDecimal("3") + if (number < ETHEREUM_REWARD_CHANGED_BLOCK_NUMBER) BigDecimal("5") else BigDecimal("3") } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Transaction.kt b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Transaction.kt index 01c27415..0fe53237 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Transaction.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Transaction.kt @@ -23,4 +23,4 @@ data class EthereumTx( val createdContract: String? //creates contract hash ) { fun addressesUsedInTransaction() = listOfNotNull(from, to, createdContract).filter { address -> !address.isEmpty() } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Uncle.kt b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Uncle.kt index 93f0fc8e..cdfbd6f8 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/ethereum/Uncle.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/ethereum/Uncle.kt @@ -7,6 +7,8 @@ import java.math.BigDecimal import java.math.RoundingMode import java.time.Instant +const val DIVIDE_SCALE = 18 + data class EthereumUncle( val hash: String, val position: Int, @@ -23,9 +25,9 @@ fun getUncleReward(chain: EthereumFamilyChain, uncleNumber: Long, blockNumber: L val blockReward = getBlockReward(chain, blockNumber) return if (chain == EthereumFamilyChain.ETHEREUM_CLASSIC) { - getBlockReward(chain, blockNumber).divide(decimal32, 18, RoundingMode.FLOOR).stripTrailingZeros() + getBlockReward(chain, blockNumber).divide(decimal32, DIVIDE_SCALE, RoundingMode.FLOOR).stripTrailingZeros() } else { ((uncleNumber.toBigDecimal() + decimal8 - blockNumber.toBigDecimal()) * blockReward) - .divide(decimal8, 18, RoundingMode.FLOOR).stripTrailingZeros() + .divide(decimal8, DIVIDE_SCALE, RoundingMode.FLOOR).stripTrailingZeros() } -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/events/PumpEvents.kt b/common/src/main/kotlin/fund/cyber/search/model/events/PumpEvents.kt index df1fc23b..16a2d5ca 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/events/PumpEvents.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/events/PumpEvents.kt @@ -5,4 +5,4 @@ enum class PumpEvent { NEW_BLOCK, NEW_POOL_TX, DROPPED_BLOCK; -} \ No newline at end of file +} diff --git a/common/src/main/kotlin/fund/cyber/search/model/events/Topics.kt b/common/src/main/kotlin/fund/cyber/search/model/events/Topics.kt index 22ab44f0..3b68b82f 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/events/Topics.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/events/Topics.kt @@ -5,4 +5,4 @@ import fund.cyber.search.model.chains.Chain val Chain.txPumpTopic: String get() = name + "_TX_PUMP" val Chain.blockPumpTopic: String get() = name + "_BLOCK_PUMP" -val Chain.unclePumpTopic: String get() = name + "_UNCLE_PUMP" \ No newline at end of file +val Chain.unclePumpTopic: String get() = name + "_UNCLE_PUMP" diff --git a/common/src/test/kotlin/fund/cyber/node/common/StackCacheTest.kt b/common/src/test/kotlin/fund/cyber/node/common/StackCacheTest.kt index 2f6f55c0..11a804cd 100644 --- a/common/src/test/kotlin/fund/cyber/node/common/StackCacheTest.kt +++ b/common/src/test/kotlin/fund/cyber/node/common/StackCacheTest.kt @@ -44,4 +44,4 @@ class StackCacheTest { assertEquals(5, stack.pop()) assertEquals(null, stack.pop()) } -} \ No newline at end of file +} diff --git a/detekt.yml b/detekt.yml index 4bd60b22..11acd2bf 100644 --- a/detekt.yml +++ b/detekt.yml @@ -149,7 +149,7 @@ empty-blocks: active: true exceptions: - active: true + active: false ExceptionRaisedInUnexpectedLocation: active: false methodNames: 'toString,hashCode,equals,finalize' @@ -215,7 +215,7 @@ naming: functionPattern: '^([a-z$][a-zA-Z$0-9]*)|(`.*`)$' excludeClassPattern: '$^' MatchingDeclarationName: - active: true + active: false MemberNameEqualsClassName: active: false ignoreOverriddenFunction: true @@ -243,7 +243,7 @@ naming: excludeClassPattern: '$^' performance: - active: true + active: false ForEachOnRange: active: true SpreadOperator: @@ -296,7 +296,7 @@ style: ExpressionBodySyntax: active: false ForbiddenComment: - active: true + active: false values: 'TODO:,FIXME:,STOPSHIP:' ForbiddenImport: active: false @@ -345,7 +345,7 @@ style: active: false ReturnCount: active: true - max: 2 + max: 3 excludedFunctions: "equals" SafeCast: active: true diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/DumpApplication.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/DumpApplication.kt index 4f67120c..b1364553 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/DumpApplication.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/DumpApplication.kt @@ -15,4 +15,4 @@ class BitcoinDumpApplication { SpringApplication(BitcoinDumpApplication::class.java).run(*args) } } -} \ No newline at end of file +} diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/ApplicationConfiguration.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/ApplicationConfiguration.kt index e19e6f8d..2552e731 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/ApplicationConfiguration.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/ApplicationConfiguration.kt @@ -22,6 +22,7 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler import org.springframework.kafka.listener.config.ContainerProperties +private const val AUTO_COMMIT_INTERVAL_MS_CONFIG = 10 * 1000L //todo add dump of tx, block tx, address tx @EnableKafka @@ -65,7 +66,7 @@ class ApplicationConfiguration { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 10 * 1000, + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to AUTO_COMMIT_INTERVAL_MS_CONFIG, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase() ) -} \ No newline at end of file +} diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt index 0de15636..b3eba43c 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt @@ -30,4 +30,4 @@ class BlockDumpProcess( blockRepository.saveAll(blocksToSave).collectList().block() } -} \ No newline at end of file +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/DumpApplication.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/DumpApplication.kt index 1ce02ba1..85da5c6e 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/DumpApplication.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/DumpApplication.kt @@ -16,4 +16,4 @@ class EthereumDumpApplication { SpringApplication(EthereumDumpApplication::class.java).run(*args) } } -} \ No newline at end of file +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/ApplcationConfiguration.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/ApplcationConfiguration.kt index d2d29930..a67a9ce6 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/ApplcationConfiguration.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/ApplcationConfiguration.kt @@ -1,6 +1,12 @@ package fund.cyber.dump.ethereum -import fund.cyber.cassandra.ethereum.repository.* +import fund.cyber.cassandra.ethereum.repository.EthereumAddressMinedBlockRepository +import fund.cyber.cassandra.ethereum.repository.EthereumAddressTxRepository +import fund.cyber.cassandra.ethereum.repository.EthereumAddressUncleRepository +import fund.cyber.cassandra.ethereum.repository.EthereumBlockRepository +import fund.cyber.cassandra.ethereum.repository.EthereumBlockTxRepository +import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository +import fund.cyber.cassandra.ethereum.repository.EthereumUncleRepository import fund.cyber.common.kafka.JsonDeserializer import fund.cyber.search.configuration.CHAIN import fund.cyber.search.configuration.KAFKA_BROKERS @@ -27,6 +33,8 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler import org.springframework.kafka.listener.config.ContainerProperties +private const val POLL_TIMEOUT = 5000L +private const val AUTO_COMMIT_INTERVAL_MS_CONFIG = 10 * 1000L @Configuration class ApplicationConfiguration { @@ -77,8 +85,9 @@ class ApplicationConfiguration { //todo add to error handler exponential wait before retries val containerProperties = ContainerProperties(chain().blockPumpTopic).apply { - messageListener = BlockDumpProcess(ethereumBlockRepository, ethereumAddressMinedBlockRepository, chain(), monitoring) - pollTimeout = 5000 + messageListener = BlockDumpProcess(ethereumBlockRepository, ethereumAddressMinedBlockRepository, chain(), + monitoring) + pollTimeout = POLL_TIMEOUT setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) } @@ -98,8 +107,9 @@ class ApplicationConfiguration { //todo add to error handler exponential wait before retries val containerProperties = ContainerProperties(chain().txPumpTopic).apply { - messageListener = TxDumpProcess(ethereumTxRepository, ethereumBlockTxRepository, ethereumAddressTxRepository, chain(), monitoring) - pollTimeout = 5000 + messageListener = TxDumpProcess(ethereumTxRepository, ethereumBlockTxRepository, + ethereumAddressTxRepository, chain(), monitoring) + pollTimeout = POLL_TIMEOUT setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) } @@ -119,8 +129,9 @@ class ApplicationConfiguration { //todo add to error handler exponential wait before retries val containerProperties = ContainerProperties(chain().unclePumpTopic).apply { - messageListener = UncleDumpProcess(ethereumUncleRepository, ethereumAddressUncleRepository, chain(), monitoring) - pollTimeout = 5000 + messageListener = UncleDumpProcess(ethereumUncleRepository, ethereumAddressUncleRepository, chain(), + monitoring) + pollTimeout = POLL_TIMEOUT setBatchErrorHandler(SeekToCurrentBatchErrorHandler()) } @@ -131,9 +142,8 @@ class ApplicationConfiguration { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 10 * 1000, + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to AUTO_COMMIT_INTERVAL_MS_CONFIG, ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase() ) - -} \ No newline at end of file +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt index 2b2d14f5..c171f875 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt @@ -54,4 +54,4 @@ class BlockDumpProcess( } } -} \ No newline at end of file +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt index 7f6a50b3..ee2c0c8b 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt @@ -1,7 +1,11 @@ package fund.cyber.dump.ethereum -import fund.cyber.cassandra.ethereum.model.* -import fund.cyber.cassandra.ethereum.repository.* +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressTxPreview +import fund.cyber.cassandra.ethereum.model.CqlEthereumBlockTxPreview +import fund.cyber.cassandra.ethereum.model.CqlEthereumTx +import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository +import fund.cyber.cassandra.ethereum.repository.EthereumBlockTxRepository +import fund.cyber.cassandra.ethereum.repository.EthereumAddressTxRepository import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumTx import fund.cyber.search.model.events.PumpEvent @@ -56,4 +60,4 @@ class TxDumpProcess( } } -} \ No newline at end of file +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt index 6f499da3..ff95628b 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt @@ -1,7 +1,9 @@ package fund.cyber.dump.ethereum -import fund.cyber.cassandra.ethereum.model.* -import fund.cyber.cassandra.ethereum.repository.* +import fund.cyber.cassandra.ethereum.model.CqlEthereumAddressMinedUncle +import fund.cyber.cassandra.ethereum.model.CqlEthereumUncle +import fund.cyber.cassandra.ethereum.repository.EthereumUncleRepository +import fund.cyber.cassandra.ethereum.repository.EthereumAddressUncleRepository import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumUncle import fund.cyber.search.model.events.PumpEvent @@ -50,4 +52,4 @@ class UncleDumpProcess( } } -} \ No newline at end of file +} diff --git a/pumps-old/src/main/kotlin/fund/cyber/pump/BlockchainInterface.kt b/pumps-old/src/main/kotlin/fund/cyber/pump/BlockchainInterface.kt index 3626f6a4..f4bace15 100644 --- a/pumps-old/src/main/kotlin/fund/cyber/pump/BlockchainInterface.kt +++ b/pumps-old/src/main/kotlin/fund/cyber/pump/BlockchainInterface.kt @@ -27,4 +27,4 @@ interface BlockBundle { interface TxPoolInterface { fun onNewTransaction(action: (T)->Unit) -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/PumpApplication.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/PumpApplication.kt index 8e66ba8f..51b4eb02 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/PumpApplication.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/PumpApplication.kt @@ -28,4 +28,4 @@ fun main(args: Array) { val applicationContext = application.run(*args) val pump = applicationContext.getBean(ChainPump::class.java) pump.startPump() -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinBlockchainInterface.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinBlockchainInterface.kt index 5db08c21..e0c4c726 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinBlockchainInterface.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinBlockchainInterface.kt @@ -32,4 +32,4 @@ class BitcoinBlockchainInterface( val bundle = rpcToBundleEntitiesConverter.convertToBundle(block) return if (number == 0L) genesisDataProvider.provide(bundle) else bundle } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinClientConfiguration.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinClientConfiguration.kt index a5fd421b..fa312745 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinClientConfiguration.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinClientConfiguration.kt @@ -6,13 +6,16 @@ import org.apache.http.message.BasicHeader import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +const val MAX_PER_ROUTE = 16 +const val MAX_TOTAL = 32 + @Configuration class BitcoinClientConfiguration { private val defaultHttpHeaders = listOf(BasicHeader("Keep-Alive", "timeout=10, max=1024")) private val connectionManager = PoolingHttpClientConnectionManager().apply { - defaultMaxPerRoute = 16 - maxTotal = 32 + defaultMaxPerRoute = MAX_PER_ROUTE + maxTotal = MAX_TOTAL } @Bean @@ -22,4 +25,4 @@ class BitcoinClientConfiguration { .setDefaultHeaders(defaultHttpHeaders) .build()!! -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinJsonRpcClient.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinJsonRpcClient.kt index 003d9a35..ad47f26e 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinJsonRpcClient.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/BitcoinJsonRpcClient.kt @@ -112,4 +112,4 @@ class TxMempoolResponse : Response>() class TxResponse : Response() class StringResponse : Response() class LongResponse : Response() -class BlockResponse : Response() \ No newline at end of file +class BlockResponse : Response() diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinBlockConverter.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinBlockConverter.kt index aee093da..51a44f29 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinBlockConverter.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinBlockConverter.kt @@ -19,11 +19,11 @@ class JsonRpcToDaoBitcoinBlockConverter { val totalOutputsValue = transactions.flatMap { tx -> tx.outs }.map { out -> out.amount }.sum() return BitcoinBlock( - hash = jsonRpcBlock.hash, size = jsonRpcBlock.size, version = jsonRpcBlock.version, bits = jsonRpcBlock.bits, - difficulty = jsonRpcBlock.difficulty.toBigInteger(), nonce = jsonRpcBlock.nonce, - time = Instant.ofEpochSecond(jsonRpcBlock.time), weight = jsonRpcBlock.weight, - merkleroot = jsonRpcBlock.merkleroot, height = jsonRpcBlock.height, + hash = jsonRpcBlock.hash, size = jsonRpcBlock.size, version = jsonRpcBlock.version, + bits = jsonRpcBlock.bits, difficulty = jsonRpcBlock.difficulty.toBigInteger(), + nonce = jsonRpcBlock.nonce, time = Instant.ofEpochSecond(jsonRpcBlock.time), + weight = jsonRpcBlock.weight, merkleroot = jsonRpcBlock.merkleroot, height = jsonRpcBlock.height, txNumber = jsonRpcBlock.tx.size, totalOutputsAmount = totalOutputsValue ) } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinTxConverter.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinTxConverter.kt index e7ad934c..7419a7ab 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinTxConverter.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/JsonRpcToDaoBitcoinTxConverter.kt @@ -1,7 +1,14 @@ package fund.cyber.pump.bitcoin.client import fund.cyber.common.sum -import fund.cyber.search.model.bitcoin.* +import fund.cyber.search.model.bitcoin.BitcoinTx +import fund.cyber.search.model.bitcoin.BitcoinTxIn +import fund.cyber.search.model.bitcoin.BitcoinTxOut +import fund.cyber.search.model.bitcoin.CoinbaseTransactionInput +import fund.cyber.search.model.bitcoin.JsonRpcBitcoinBlock +import fund.cyber.search.model.bitcoin.JsonRpcBitcoinTransaction +import fund.cyber.search.model.bitcoin.JsonRpcBitcoinTransactionOutput +import fund.cyber.search.model.bitcoin.RegularTransactionInput import org.slf4j.LoggerFactory import java.math.BigDecimal.ZERO import java.time.Instant @@ -129,4 +136,4 @@ class JsonRpcToDaoBitcoinTxConverter { requiredSignatures = jsonRpcTxOut.scriptPubKey.reqSigs ) } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/genesis/BitcoinGenesisBundleProvider.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/genesis/BitcoinGenesisBundleProvider.kt index 8e13c73b..5162e76d 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/genesis/BitcoinGenesisBundleProvider.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/genesis/BitcoinGenesisBundleProvider.kt @@ -95,4 +95,4 @@ data class TxIn( data class TxOut( val addresses: List, val amount: BigDecimal -) \ No newline at end of file +) diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducer.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducer.kt index eacf1247..b9eb8267 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducer.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducer.kt @@ -21,4 +21,4 @@ class BitcoinBlockBundleProducer( blockBundle.transactions.forEach { tx -> kafkaTemplate.send(chain.txPumpTopic, PumpEvent.NEW_BLOCK, tx) } } } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducerConfiguration.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducerConfiguration.kt index 6d0886db..8f98764c 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducerConfiguration.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/BitcoinBundleProducerConfiguration.kt @@ -55,9 +55,10 @@ class BitcoinBundleProducerConfiguration { @Bean fun producerFactory(): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()).apply { - setTransactionIdPrefix(chain.name + "_PUMP") - } + return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()) + .apply { + setTransactionIdPrefix(chain.name + "_PUMP") + } } @Bean @@ -74,4 +75,4 @@ class BitcoinBundleProducerConfiguration { fun transactionManager(): KafkaTransactionManager { return KafkaTransactionManager(producerFactory()) } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/LastPumpedBitcoinBundlesProvider.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/LastPumpedBitcoinBundlesProvider.kt index 024f486b..dfa8c524 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/LastPumpedBitcoinBundlesProvider.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/kafka/LastPumpedBitcoinBundlesProvider.kt @@ -40,4 +40,4 @@ class LastPumpedBitcoinBundlesProvider( return listOf(event to bundle) } -} \ No newline at end of file +} diff --git a/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt b/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt index 8defef9c..dd4a7878 100644 --- a/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt +++ b/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt @@ -133,4 +133,4 @@ class BtcdToDaoTxConverterTest { Assertions.assertEquals(expectedRegularTx, regularTx) } -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/ChainPump.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/ChainPump.kt index a08c1bcb..67abdd32 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/ChainPump.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/ChainPump.kt @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicLong private val log = LoggerFactory.getLogger(ChainPump::class.java)!! +private const val BLOCK_BUFFER_TIMESPAN = 3L //todo add chain reorganisation @Component @@ -50,7 +51,7 @@ class ChainPump( val kafkaWriteMonitor = monitoring.timer("pump_bundle_kafka_store") flowableBlockchainInterface.subscribeBlocks(startBlockNumber) - .buffer(3, TimeUnit.SECONDS) + .buffer(BLOCK_BUFFER_TIMESPAN, TimeUnit.SECONDS) .blockingSubscribe( { blockBundles -> if (blockBundles.isEmpty()) return@blockingSubscribe @@ -78,4 +79,4 @@ class ChainPump( } class ChainReindexationException : RuntimeException() -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/CommonConfiguration.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/CommonConfiguration.kt index 3ed766ae..8818a65a 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/CommonConfiguration.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/CommonConfiguration.kt @@ -33,7 +33,8 @@ private val log = LoggerFactory.getLogger(DefaultRetryListenerSupport::class.jav class DefaultRetryListenerSupport: RetryListenerSupport() { - override fun onError(context: RetryContext, callback: RetryCallback?, throwable: Throwable) { + override fun onError(context: RetryContext, callback: RetryCallback?, + throwable: Throwable) { if (context.retryCount == 1) log.error("Error occurred. Start retrying...", throwable) super.onError(context, callback, throwable) } @@ -57,7 +58,8 @@ class CommonConfiguration { @Bean fun commonPumpConsumer(): Consumer { - return KafkaConsumer(consumerProperties(), JsonDeserializer(Any::class.java), JsonDeserializer(Any::class.java)) + return KafkaConsumer(consumerProperties(), JsonDeserializer(Any::class.java), + JsonDeserializer(Any::class.java)) } @Bean @@ -84,4 +86,4 @@ class CommonConfiguration { put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase()) } -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt index 384b77df..99464a07 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt @@ -5,4 +5,4 @@ import fund.cyber.pump.common.node.BlockBundle interface KafkaBlockBundleProducer { fun storeBlockBundle(blockBundles: List) -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/LastPumpedBlocksProvider.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/LastPumpedBlocksProvider.kt index 70e4c810..c9bad6f8 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/LastPumpedBlocksProvider.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/LastPumpedBlocksProvider.kt @@ -5,4 +5,4 @@ import fund.cyber.search.model.events.PumpEvent interface LastPumpedBundlesProvider { fun getLastBlockBundles(): List> -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/monitoring/LastNetworkBlockNumberMonitoring.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/monitoring/LastNetworkBlockNumberMonitoring.kt index 95e86d0b..b0674621 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/monitoring/LastNetworkBlockNumberMonitoring.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/monitoring/LastNetworkBlockNumberMonitoring.kt @@ -2,10 +2,14 @@ package fund.cyber.pump.common.monitoring import fund.cyber.pump.common.node.FlowableBlockchainInterface import io.micrometer.core.instrument.MeterRegistry +import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import java.util.concurrent.atomic.AtomicLong +private val log = LoggerFactory.getLogger(LastNetworkBlockNumberMonitoring::class.java)!! + +private const val LAST_NETWORK_BLOCK_NUMBER_TIMEOUT = 10 * 1000L @Component class LastNetworkBlockNumberMonitoring( @@ -16,11 +20,13 @@ class LastNetworkBlockNumberMonitoring( private val lastProcessedBlockMonitor = monitoring .gauge("pump_last_network_block", AtomicLong(blockchainInterface.lastNetworkBlock()))!! - @Scheduled(fixedRate = 10 * 1000) + @Scheduled(fixedRate = LAST_NETWORK_BLOCK_NUMBER_TIMEOUT) fun getLastNetworkBlockNumber() { try { val lastNetworkBlock = blockchainInterface.lastNetworkBlock() lastProcessedBlockMonitor.set(lastNetworkBlock) - } catch (e: Exception) {} + } catch (e: Exception) { + log.error("Error getting network block number", e) + } } -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/BlockchainInterface.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/BlockchainInterface.kt index cb7dc452..e322bf81 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/BlockchainInterface.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/BlockchainInterface.kt @@ -13,4 +13,4 @@ interface BlockBundle { interface BlockchainInterface { fun lastNetworkBlock(): Long fun blockBundleByNumber(number: Long): T -} \ No newline at end of file +} diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/FlowableBlockchainInterface.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/FlowableBlockchainInterface.kt index 6f9c2c82..6e924a34 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/FlowableBlockchainInterface.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/node/FlowableBlockchainInterface.kt @@ -18,6 +18,9 @@ interface FlowableBlockchainInterface : BlockchainInterface private val log = LoggerFactory.getLogger(ConcurrentPulledBlockchain::class.java)!! +private const val REAL_TIME_BLOCK_QUERYING_TIMEOUT = 1000L +private const val MAX_CONCURRENCY = 20 + class ConcurrentPulledBlockchain( private val blockchainInterface: BlockchainInterface, private val batchSize: Int = 20, @@ -36,7 +39,7 @@ class ConcurrentPulledBlockchain( lastNetworkBlock = retryTemplate.execute { lastNetworkBlock() } if (nextBlockNumber > lastNetworkBlock) { log.debug("Up-to-date block $nextBlockNumber") - sleep(1000) + sleep(REAL_TIME_BLOCK_QUERYING_TIMEOUT) emitter.onNext(-1L..-1L) return@BiFunction nextBlockNumber } @@ -54,7 +57,8 @@ class ConcurrentPulledBlockchain( // 2) download each group member in parallel override fun subscribeBlocks(startBlockNumber: Long): Flowable { - return Flowable.generate(Callable { startBlockNumber }, generateAvailableBlocksNumbersRangesFunction) + return Flowable + .generate(Callable { startBlockNumber }, generateAvailableBlocksNumbersRangesFunction) .flatMap({ blockNumbers -> asyncDownloadBlocks(blockNumbers) }, 1) } @@ -64,7 +68,7 @@ class ConcurrentPulledBlockchain( log.debug("Looking for ${blockNumbers.first}-${blockNumbers.last} blocks") return blockNumbers.toFlowable() - .flatMap({ number -> asyncDownloadBlock(number) }, 20) + .flatMap({ number -> asyncDownloadBlock(number) }, MAX_CONCURRENCY) .sorted { o1, o2 -> o1.number.compareTo(o2.number) } } diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/PumpApplication.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/PumpApplication.kt index c3ffe805..1dc4171e 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/PumpApplication.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/PumpApplication.kt @@ -30,4 +30,4 @@ class EthereumPumpApplication { pump.startPump() } } -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt index feed62fe..8f486426 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt @@ -59,10 +59,12 @@ class EthereumBlockchainInterface( private fun downloadUnclesData(ethBlock: EthBlock): List { val unclesFutures = ethBlock.block.uncles.mapIndexed { index, _ -> - parityClient.ethGetUncleByBlockHashAndIndex(ethBlock.block.hash, BigInteger.valueOf(index.toLong())).sendAsync() + parityClient + .ethGetUncleByBlockHashAndIndex(ethBlock.block.hash, BigInteger.valueOf(index.toLong())) + .sendAsync() } return unclesFutures.await().map { uncleEthBlock -> uncleEthBlock.block } } private fun blockParameter(blockNumber: BigInteger) = DefaultBlockParameter.valueOf(blockNumber)!! -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt index 19e10e00..952665a3 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt @@ -11,13 +11,16 @@ import org.springframework.context.annotation.Configuration import org.web3j.protocol.Web3j import org.web3j.protocol.http.HttpService +const val MAX_PER_ROUTE = 16 +const val MAX_TOTAL = 32 + @Configuration class EthereumClientConfiguration { private val defaultHttpHeaders = listOf(BasicHeader("Keep-Alive", "timeout=10, max=1024")) private val connectionManager = PoolingHttpClientConnectionManager().apply { - defaultMaxPerRoute = 16 - maxTotal = 32 + defaultMaxPerRoute = MAX_PER_ROUTE + maxTotal = MAX_TOTAL } private val endpointUrl = env(CHAIN_NODE_URL, ETHEREUM_CHAIN_NODE_DEFAULT_URL) @@ -31,4 +34,4 @@ class EthereumClientConfiguration { @Bean fun parityClient() = Web3j.build(HttpService(endpointUrl, httpClient()))!! -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt index 120749a1..dffcda8b 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt @@ -1,9 +1,16 @@ package fund.cyber.pump.ethereum.client +import fund.cyber.common.DECIMAL_SCALE +import fund.cyber.common.decimal32 import fund.cyber.common.hexToLong import fund.cyber.common.sum import fund.cyber.search.model.chains.EthereumFamilyChain -import fund.cyber.search.model.ethereum.* +import fund.cyber.search.model.ethereum.EthereumBlock +import fund.cyber.search.model.ethereum.EthereumTx +import fund.cyber.search.model.ethereum.EthereumUncle +import fund.cyber.search.model.ethereum.getBlockReward +import fund.cyber.search.model.ethereum.getUncleReward +import fund.cyber.search.model.ethereum.weiToEthRate import org.springframework.stereotype.Component import org.web3j.protocol.core.methods.response.EthBlock import java.math.BigDecimal @@ -70,7 +77,7 @@ class ParityToEthereumBundleConverter( val number = parityBlock.numberRaw.hexToLong() val blockReward = getBlockReward(chain, number) val uncleReward = (blockReward * parityBlock.uncles.size.toBigDecimal()) - .divide(32.toBigDecimal(), 18, RoundingMode.FLOOR).stripTrailingZeros() + .divide(decimal32, DECIMAL_SCALE, RoundingMode.FLOOR).stripTrailingZeros() return EthereumBlock( hash = parityBlock.hash, parentHash = parityBlock.parentHash, number = number, diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/genesis/EthereumGenesisBundleProvider.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/genesis/EthereumGenesisBundleProvider.kt index e5ff9013..bc29416d 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/genesis/EthereumGenesisBundleProvider.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/genesis/EthereumGenesisBundleProvider.kt @@ -78,4 +78,4 @@ class EthereumGenesisFile( class Balance( var balance: String? -) \ No newline at end of file +) diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBlockBundleProducer.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBlockBundleProducer.kt index 5857987f..93cd40ec 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBlockBundleProducer.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBlockBundleProducer.kt @@ -26,4 +26,4 @@ class EthereumBlockBundleProducer( blockBundle.uncles.forEach { uncle -> kafkaTemplate.send(chain.unclePumpTopic, PumpEvent.NEW_BLOCK, uncle) } } } -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBundleProducerConfiguration.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBundleProducerConfiguration.kt index 52bf1db6..634d1108 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBundleProducerConfiguration.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/EthereumBundleProducerConfiguration.kt @@ -54,9 +54,10 @@ class EthereumBundleProducerConfiguration { @Bean fun producerFactory(): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()).apply { - setTransactionIdPrefix(chain.name + "_PUMP") - } + return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()) + .apply { + setTransactionIdPrefix(chain.name + "_PUMP") + } } @Bean @@ -73,4 +74,4 @@ class EthereumBundleProducerConfiguration { fun transactionManager(): KafkaTransactionManager { return KafkaTransactionManager(producerFactory()) } -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/LastPumpedEthereumBundleProvider.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/LastPumpedEthereumBundleProvider.kt index 6b1c199d..be5dfd57 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/LastPumpedEthereumBundleProvider.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/kafka/LastPumpedEthereumBundleProvider.kt @@ -35,4 +35,4 @@ class LastPumpedEthereumBundleProvider( return listOf(event to bundle) } -} \ No newline at end of file +} diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/monitoring/LastTopicOffsetMonitoring.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/monitoring/LastTopicOffsetMonitoring.kt index 75151867..e14b2a42 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/monitoring/LastTopicOffsetMonitoring.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/monitoring/LastTopicOffsetMonitoring.kt @@ -8,10 +8,15 @@ import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.common.TopicPartition +import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import java.util.concurrent.atomic.AtomicLong +private val log = LoggerFactory.getLogger(LastTopicOffsetMonitoring::class.java)!! + +private const val LAST_NETWORK_BLOCK_NUMBER_TIMEOUT = 10 * 1000L + @Component class LastTopicOffsetMonitoring( monitoring: MeterRegistry, @@ -28,7 +33,7 @@ class LastTopicOffsetMonitoring( private val lastUncleTopicOffsetMonitor = monitoring.gauge("pump_topic_last_offset", Tags.of("topic", chain.unclePumpTopic), AtomicLong(lastOffset(chain.unclePumpTopic)))!! - @Scheduled(fixedRate = 10 * 1000) + @Scheduled(fixedRate = LAST_NETWORK_BLOCK_NUMBER_TIMEOUT) fun getLastNetworkBlockNumber() { try { val lastTxTopicOffset = lastOffset(chain.txPumpTopic) @@ -40,6 +45,7 @@ class LastTopicOffsetMonitoring( val lastUncleTopicOffset = lastOffset(chain.unclePumpTopic) lastUncleTopicOffsetMonitor.set(lastUncleTopicOffset) } catch (e: Exception) { + log.error("Error getting last network block number", e) } } @@ -54,4 +60,4 @@ class LastTopicOffsetMonitoring( return consumer.position(partition) - 1 } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/SearchApiApplication.kt b/search-api/src/main/kotlin/fund/cyber/SearchApiApplication.kt index d0815731..acb65492 100644 --- a/search-api/src/main/kotlin/fund/cyber/SearchApiApplication.kt +++ b/search-api/src/main/kotlin/fund/cyber/SearchApiApplication.kt @@ -13,4 +13,4 @@ class SearchApiApplication fun main(args: Array) { runApplication(*args) -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/common/CommonConfiguration.kt b/search-api/src/main/kotlin/fund/cyber/api/common/CommonConfiguration.kt index f0e95829..b29107dd 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/common/CommonConfiguration.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/common/CommonConfiguration.kt @@ -1,12 +1,20 @@ package fund.cyber.api.common -import fund.cyber.search.configuration.* +import fund.cyber.search.configuration.CASSANDRA_HOSTS +import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT +import fund.cyber.search.configuration.CORS_ALLOWED_ORIGINS +import fund.cyber.search.configuration.CORS_ALLOWED_ORIGINS_DEFAULT +import fund.cyber.search.configuration.ELASTIC_CLUSTER_NAME +import fund.cyber.search.configuration.ELASTIC_CLUSTER_NAME_DEFAULT +import fund.cyber.search.configuration.ELASTIC_TRANSPORT_PORT +import fund.cyber.search.configuration.ELASTIC_TRANSPORT_PORT_DEFAULT import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.transport.client.PreBuiltTransportClient import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.* +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration import org.springframework.web.cors.CorsConfiguration import org.springframework.web.cors.reactive.CorsWebFilter import org.springframework.web.cors.reactive.UrlBasedCorsConfigurationSource @@ -59,4 +67,4 @@ class CommonConfiguration { return CorsWebFilter(source) } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/common/MetricsWebFilter.kt b/search-api/src/main/kotlin/fund/cyber/api/common/MetricsWebFilter.kt index af23e5db..1f2c27d7 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/common/MetricsWebFilter.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/common/MetricsWebFilter.kt @@ -13,6 +13,9 @@ import org.springframework.web.server.WebFilterChain import reactor.core.publisher.Mono import java.util.concurrent.TimeUnit +const val NINGTHY_FIVE_PERCENT = 0.95 +const val NINE_HUNDRED_NINGTHY_FIVE_PERCENT = 0.95 + @Component @Order(Ordered.HIGHEST_PRECEDENCE + 1) class MetricsWebFilter(private val registry: MeterRegistry) : WebFilter { @@ -34,7 +37,7 @@ class MetricsWebFilter(private val registry: MeterRegistry) : WebFilter { Timer.builder("http_requests_processing") .tags(listOf(uriTag)) - .publishPercentiles(0.95, 0.995) + .publishPercentiles(NINGTHY_FIVE_PERCENT, NINE_HUNDRED_NINGTHY_FIVE_PERCENT) .register(registry) .record(System.nanoTime() - start, TimeUnit.NANOSECONDS) } diff --git a/search-api/src/main/kotlin/fund/cyber/api/common/PingController.kt b/search-api/src/main/kotlin/fund/cyber/api/common/PingController.kt index 559e9476..ad38edca 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/common/PingController.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/common/PingController.kt @@ -12,4 +12,4 @@ class PingController { @GetMapping("/ping") @ResponseStatus(HttpStatus.OK) fun ping() = {} -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/AddressHandlers.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/AddressHandlers.kt index 13c2cca9..be032bec 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/AddressHandlers.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/AddressHandlers.kt @@ -28,7 +28,8 @@ class AddressHandlersConfiguration { fun addressById(): RouterFunction { return EthereumFamilyChain.values().map { chain -> - val repository = applicationContext.getBean(chain.name + "addressRepository", EthereumAddressRepository::class.java) + val repository = applicationContext + .getBean(chain.name + "addressRepository", EthereumAddressRepository::class.java) val blockByNumber = HandlerFunction { request -> val addressId = request.pathVariable("id") @@ -50,4 +51,4 @@ class AddressHandlersConfiguration { RouterFunctions.route(RequestPredicates.path("/${chain.lowerCaseName}/address/{id}/transactions"), handler) }.asSingleRouterFunction() } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/BlockHandlers.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/BlockHandlers.kt index 5c0ade4c..d200fecd 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/BlockHandlers.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/BlockHandlers.kt @@ -28,7 +28,8 @@ class BlockHandlersConfiguration { fun blockByNumber(): RouterFunction { return EthereumFamilyChain.values().map { chain -> - val blockRepository = applicationContext.getBean(chain.name + "blockRepository", EthereumBlockRepository::class.java) + val blockRepository = applicationContext + .getBean(chain.name + "blockRepository", EthereumBlockRepository::class.java) val blockByNumber = HandlerFunction { request -> val blockNumber = request.pathVariable("blockNumber").toLong() @@ -50,4 +51,4 @@ class BlockHandlersConfiguration { RouterFunctions.route(path("/${chain.lowerCaseName}/block/{blockNumber}/transactions"), handler) }.asSingleRouterFunction() } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/TxHandlers.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/TxHandlers.kt index b8648e53..8101d370 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/TxHandlers.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/TxHandlers.kt @@ -38,4 +38,4 @@ class TxHandlersConfiguration { RouterFunctions.route(RequestPredicates.path("/${chain.lowerCaseName}/tx/{hash}"), txByHash) }.asSingleRouterFunction() } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/AddressTxesByAddres.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/AddressTxesByAddres.kt index 879ce7f5..a64b53bd 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/AddressTxesByAddres.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/AddressTxesByAddres.kt @@ -30,4 +30,4 @@ class AddressTxesByAddres( } return ServerResponse.ok().body(slice.content.toFlux(), CqlEthereumAddressTxPreview::class.java) } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/BlockTxesByBlockNumber.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/BlockTxesByBlockNumber.kt index 611fe66b..d6020d9d 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/BlockTxesByBlockNumber.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/functions/BlockTxesByBlockNumber.kt @@ -30,4 +30,4 @@ class BlockTxesByBlockNumber( } return ServerResponse.ok().body(slice.content.toFlux(), CqlEthereumBlockTxPreview::class.java) } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/search/SearchController.kt b/search-api/src/main/kotlin/fund/cyber/api/search/SearchController.kt index 45e0d2c1..2e7a8c84 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/search/SearchController.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/search/SearchController.kt @@ -42,4 +42,4 @@ class SearchController( ) ) } -} \ No newline at end of file +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/search/SearchModel.kt b/search-api/src/main/kotlin/fund/cyber/api/search/SearchModel.kt index 7d7a4d23..3fbc03d3 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/search/SearchModel.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/search/SearchModel.kt @@ -19,4 +19,4 @@ data class SearchResponse( val totalHits: Long, val searchTime: Long, //ms val items: List -) \ No newline at end of file +)