Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#73 Add detekt static analytic for source code. Fix detekt issues
Browse files Browse the repository at this point in the history
  • Loading branch information
arturalbov authored and hleb-albau committed Mar 27, 2018
1 parent 22d906e commit af77033
Show file tree
Hide file tree
Showing 110 changed files with 432 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ class BitcoinAddressSummaryStorage(

override fun findById(id: String): Mono<CqlBitcoinAddressSummary> = addressSummaryRepository.findById(id)

override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlBitcoinAddressSummary> = addressSummaryRepository.findAllByIdIn(ids)
override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlBitcoinAddressSummary> = addressSummaryRepository
.findAllByIdIn(ids)

override fun update(summary: CqlBitcoinAddressSummary, oldVersion: Long): Mono<Boolean> = addressSummaryRepository.update(summary, oldVersion)
override fun update(summary: CqlBitcoinAddressSummary, oldVersion: Long): Mono<Boolean> = addressSummaryRepository
.update(summary, oldVersion)

override fun insertIfNotRecord(summary: CqlBitcoinAddressSummary): Mono<Boolean> = addressSummaryRepository.insertIfNotRecord(summary)
override fun insertIfNotRecord(summary: CqlBitcoinAddressSummary): Mono<Boolean> = addressSummaryRepository
.insertIfNotRecord(summary)

override fun commitUpdate(address: String, newVersion: Long): Mono<Boolean> = addressSummaryRepository.commitUpdate(address, newVersion)
override fun commitUpdate(address: String, newVersion: Long): Mono<Boolean> = addressSummaryRepository
.commitUpdate(address, newVersion)

override fun update(summary: CqlBitcoinAddressSummary): Mono<CqlBitcoinAddressSummary> = addressSummaryRepository.save(summary)
override fun update(summary: CqlBitcoinAddressSummary): Mono<CqlBitcoinAddressSummary> = addressSummaryRepository
.save(summary)

override fun remove(address: String): Mono<Void> = addressSummaryRepository.deleteById(address)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.*
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.BATCH
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler
import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.transaction.annotation.EnableTransactionManagement

private const val MAX_POLL_RECORDS_CONFIG = 10

@EnableKafka
@Configuration
@EnableTransactionManagement
Expand Down Expand Up @@ -74,6 +77,6 @@ class BitcoinTxConsumerConfiguration {
ConsumerConfig.GROUP_ID_CONFIG to "bitcoin-address-summary-update-process",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 10
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ class BitcoinDeltaMerger: DeltaMerger<BitcoinAddressSummaryDelta> {
val existingSummary = currentAddresses[first.address]


// todo: what if deltas to apply is empty? Case: we didn't commit offset range and dropped. Then restored with the same offset range
val deltasToApply = deltas.filterNot { delta ->
existingSummary != null && existingSummary.kafkaDeltaTopic == delta.topic
&& existingSummary.kafkaDeltaPartition == delta.partition && delta.offset <= existingSummary.kafkaDeltaOffset
&& existingSummary.kafkaDeltaPartition == delta.partition
&& delta.offset <= existingSummary.kafkaDeltaOffset
}
val balance = deltasToApply.sumByDecimal { delta -> delta.balanceDelta }
val totalReceived = deltasToApply.sumByDecimal { delta -> delta.totalReceivedDelta }
Expand All @@ -119,4 +119,4 @@ class BitcoinDeltaMerger: DeltaMerger<BitcoinAddressSummaryDelta> {
offset = deltasToApply.maxBy { it -> it.offset }!!.offset
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ class CommonConfiguration {
fun metricsCommonTags(): MeterRegistryCustomizer<MeterRegistry> {
return MeterRegistryCustomizer { registry -> registry.config().commonTags("chain", chain.name) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ interface DeltaProcessor<R, S: CqlAddressSummary, out D: AddressSummaryDelta<S>>
//todo this class should not be aware of kafka records
interface DeltaMerger<D: AddressSummaryDelta<*>> {
fun mergeDeltas(deltas: Iterable<D>, currentAddresses: Map<String, CqlAddressSummary>): D?
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package fund.cyber.address.common.delta.apply

class AddressLockException: Exception()
class AddressLockException: Exception()
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,26 @@ fun <T> Flux<T>.await(): List<T> {
return this.collectList().block()!!
}

fun CqlAddressSummary.hasSameTopicPartitionAs(delta: AddressSummaryDelta<*>) =
this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition

fun CqlAddressSummary.hasSameTopicPartitionAs(topic: String, partition: Int) =
this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition

fun CqlAddressSummary.notSameTopicPartitionAs(delta: AddressSummaryDelta<*>) =
hasSameTopicPartitionAs(delta).not()

fun CqlAddressSummary.committed() = this.kafkaDeltaOffsetCommitted

fun CqlAddressSummary.notCommitted() = committed().not()

@Suppress("MagicNumber")
private val applicationContext = newFixedThreadPoolContext(8, "Coroutines Concurrent Pool")
private val log = LoggerFactory.getLogger(UpdateAddressSummaryProcess::class.java)!!

private const val MAX_STORE_ATTEMPTS = 20
private const val STORE_RETRY_TIMEOUT = 30L

data class UpdateInfo(
val topic: String,
val partition: Int,
Expand Down Expand Up @@ -92,14 +109,18 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
deltaStoreTimer.recordCallable {
runBlocking {
mergedDeltas.values.map { delta ->
async(applicationContext) { store(addressesSummary[delta.address], delta, storeAttempts, previousStates) }
async(applicationContext) {
store(addressesSummary[delta.address], delta, storeAttempts, previousStates)
}
}.map { it.await() }
}
}

commitKafkaTimer.recordCallable { consumer.commitSync() }

val newSummaries = downloadCassandraTimer.recordCallable { addressSummaryStorage.findAllByIdIn(addresses).await() }
val newSummaries = downloadCassandraTimer.recordCallable {
addressSummaryStorage.findAllByIdIn(addresses).await()
}

commitCassandraTimer.recordCallable {
runBlocking {
Expand Down Expand Up @@ -139,7 +160,9 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
}
}

private suspend fun store(summary: S?, delta: D, storeAttempts: MutableMap<String, Int>, previousStates: MutableMap<String, S?>) {
@Suppress("ComplexMethod", "NestedBlockDepth")
private suspend fun store(summary: S?, delta: D, storeAttempts: MutableMap<String, Int>,
previousStates: MutableMap<String, S?>) {

previousStates[delta.address] = summary
if (summary != null) {
Expand All @@ -155,7 +178,7 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
}

if (summary.notCommitted() && summary.notSameTopicPartitionAs(delta)) {
if (storeAttempts[delta.address] ?: 0 > 20) {
if (storeAttempts[delta.address] ?: 0 > MAX_STORE_ATTEMPTS) {
if (summary.currentTopicPartitionWentFurther()) {
val result = delta.applyTo(summary)
if (!result) {
Expand All @@ -166,7 +189,7 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
throw AddressLockException()
}
} else {
delay(30, TimeUnit.MILLISECONDS)
delay(STORE_RETRY_TIMEOUT, TimeUnit.MILLISECONDS)
val inc = storeAttempts.getOrPut(delta.address, { 1 }).inc()
storeAttempts[delta.address] = inc
store(getSummaryByDelta(delta), delta, storeAttempts, previousStates)
Expand All @@ -186,22 +209,6 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe

private suspend fun getSummaryByDelta(delta: D) = addressSummaryStorage.findById(delta.address).await()

private fun CqlAddressSummary.hasSameTopicPartitionAs(delta: D) =
this.kafkaDeltaTopic == delta.topic && this.kafkaDeltaPartition == delta.partition

private fun CqlAddressSummary.hasSameTopicPartitionAs(topic: String, partition: Int) =
this.kafkaDeltaTopic == topic && this.kafkaDeltaPartition == partition

private fun CqlAddressSummary.notSameTopicPartitionAs(delta: D) =
hasSameTopicPartitionAs(delta).not()

private fun CqlAddressSummary.committed() = this.kafkaDeltaOffsetCommitted

private fun CqlAddressSummary.notCommitted() = committed().not()

private fun CqlAddressSummary.currentTopicPartitionWentFurther() =
lastOffsetOf(this.kafkaDeltaTopic, this.kafkaDeltaPartition) > this.kafkaDeltaOffset//todo : = or >= ????

private fun initMonitors(info: UpdateInfo) {
val tags = Tags.of("topic", info.topic)
if (!(::applyLockMonitor.isInitialized)) {
Expand All @@ -225,6 +232,9 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
}
}

private fun CqlAddressSummary.currentTopicPartitionWentFurther() =
lastOffsetOf(this.kafkaDeltaTopic, this.kafkaDeltaPartition) > this.kafkaDeltaOffset//todo : = or >= ????

private fun lastOffsetOf(topic: String, partition: Int): Long {

val reader = SinglePartitionTopicLastItemsReader(
Expand All @@ -241,4 +251,4 @@ class UpdateAddressSummaryProcess<R, S : CqlAddressSummary, D : AddressSummaryDe
}.subscribe()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ interface AddressSummaryStorage<S: CqlAddressSummary> {
fun commitUpdate(address: String, newVersion: Long): Mono<Boolean>
fun update(summary: S): Mono<S>
fun remove(address: String): Mono<Void>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class UpdateEthereumAddressSummaryApplication {
SpringApplication.run(UpdateEthereumAddressSummaryApplication::class.java, *args)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ class EthereumAddressSummaryStorage(

override fun findById(id: String): Mono<CqlEthereumAddressSummary> = addressSummaryRepository.findById(id)

override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlEthereumAddressSummary> = addressSummaryRepository.findAllById(ids)
override fun findAllByIdIn(ids: Iterable<String>): Flux<CqlEthereumAddressSummary> = addressSummaryRepository
.findAllById(ids)

override fun update(summary: CqlEthereumAddressSummary, oldVersion: Long): Mono<Boolean> = addressSummaryRepository.update(summary, oldVersion)
override fun update(summary: CqlEthereumAddressSummary, oldVersion: Long): Mono<Boolean> = addressSummaryRepository
.update(summary, oldVersion)

override fun insertIfNotRecord(summary: CqlEthereumAddressSummary): Mono<Boolean> = addressSummaryRepository.insertIfNotRecord(summary)
override fun insertIfNotRecord(summary: CqlEthereumAddressSummary): Mono<Boolean> = addressSummaryRepository
.insertIfNotRecord(summary)

override fun commitUpdate(address: String, newVersion: Long): Mono<Boolean> = addressSummaryRepository.commitUpdate(address, newVersion)
override fun commitUpdate(address: String, newVersion: Long): Mono<Boolean> = addressSummaryRepository
.commitUpdate(address, newVersion)

override fun update(summary: CqlEthereumAddressSummary): Mono<CqlEthereumAddressSummary> = addressSummaryRepository.save(summary)
override fun update(summary: CqlEthereumAddressSummary): Mono<CqlEthereumAddressSummary> = addressSummaryRepository
.save(summary)

override fun remove(address: String): Mono<Void> = addressSummaryRepository.deleteById(address)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler
import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.transaction.annotation.EnableTransactionManagement

private const val MAX_POLL_RECORDS_CONFIG = 500

@EnableKafka
@Configuration
@EnableTransactionManagement
Expand Down Expand Up @@ -129,6 +131,6 @@ class EthereumTxConsumerConfiguration {
ConsumerConfig.GROUP_ID_CONFIG to "ethereum-address-summary-update-process",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to IsolationLevel.READ_COMMITTED.toString().toLowerCase(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to MAX_POLL_RECORDS_CONFIG
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ data class EthereumAddressSummaryDelta(

override fun updateSummary(summary: CqlEthereumAddressSummary): CqlEthereumAddressSummary {
return CqlEthereumAddressSummary(
id = summary.id, confirmedBalance = summary.confirmedBalance + this.balanceDelta, contractAddress = summary.contractAddress,
id = summary.id, confirmedBalance = summary.confirmedBalance + this.balanceDelta,
contractAddress = summary.contractAddress,
confirmedTotalReceived = summary.confirmedTotalReceived + this.totalReceivedDelta,
txNumber = summary.txNumber + this.txNumberDelta, minedUncleNumber = summary.minedUncleNumber + this.uncleNumberDelta,
txNumber = summary.txNumber + this.txNumberDelta,
minedUncleNumber = summary.minedUncleNumber + this.uncleNumberDelta,
minedBlockNumber = summary.minedBlockNumber + this.minedBlockNumberDelta,
kafkaDeltaOffset = this.offset, kafkaDeltaTopic = this.topic,
kafkaDeltaPartition = this.partition, version = summary.version + 1
Expand All @@ -74,8 +76,8 @@ class EthereumTxDeltaProcessor : DeltaProcessor<EthereumTx, CqlEthereumAddressSu
)

val addressDeltaByOutput = EthereumAddressSummaryDelta(
address = (tx.to ?: tx.createdContract)!!, txNumberDelta = 1, minedBlockNumberDelta = 0, uncleNumberDelta = 0,
balanceDelta = tx.value, totalReceivedDelta = tx.value,
address = (tx.to ?: tx.createdContract)!!, txNumberDelta = 1, minedBlockNumberDelta = 0,
uncleNumberDelta = 0, balanceDelta = tx.value, totalReceivedDelta = tx.value,
contractAddress = (tx.createdContract != null), topic = record.topic(), partition = record.partition(),
offset = record.offset()
)
Expand All @@ -97,7 +99,8 @@ class EthereumTxDeltaProcessor : DeltaProcessor<EthereumTx, CqlEthereumAddressSu
}

@Component
class EthereumBlockDeltaProcessor : DeltaProcessor<EthereumBlock, CqlEthereumAddressSummary, EthereumAddressSummaryDelta> {
class EthereumBlockDeltaProcessor
: DeltaProcessor<EthereumBlock, CqlEthereumAddressSummary, EthereumAddressSummaryDelta> {

override fun recordToDeltas(record: ConsumerRecord<PumpEvent, EthereumBlock>): List<EthereumAddressSummaryDelta> {

Expand All @@ -121,7 +124,8 @@ class EthereumBlockDeltaProcessor : DeltaProcessor<EthereumBlock, CqlEthereumAdd
}

@Component
class EthereumUncleDeltaProcessor : DeltaProcessor<EthereumUncle, CqlEthereumAddressSummary, EthereumAddressSummaryDelta> {
class EthereumUncleDeltaProcessor
: DeltaProcessor<EthereumUncle, CqlEthereumAddressSummary, EthereumAddressSummaryDelta> {

override fun recordToDeltas(record: ConsumerRecord<PumpEvent, EthereumUncle>): List<EthereumAddressSummaryDelta> {

Expand Down Expand Up @@ -154,7 +158,8 @@ class EthereumDeltaMerger : DeltaMerger<EthereumAddressSummaryDelta> {

val deltasToApply = deltas.filterNot { delta ->
existingSummary != null && existingSummary.kafkaDeltaTopic == delta.topic
&& existingSummary.kafkaDeltaPartition == delta.partition && delta.offset <= existingSummary.kafkaDeltaOffset
&& existingSummary.kafkaDeltaPartition == delta.partition
&& delta.offset <= existingSummary.kafkaDeltaOffset
}
val balance = deltasToApply.sumByDecimal { delta -> delta.balanceDelta }
val totalReceived = deltasToApply.sumByDecimal { delta -> delta.totalReceivedDelta }
Expand All @@ -166,7 +171,8 @@ class EthereumDeltaMerger : DeltaMerger<EthereumAddressSummaryDelta> {
address = first.address, balanceDelta = balance, totalReceivedDelta = totalReceived,
txNumberDelta = txNumber, minedBlockNumberDelta = blockNumber, uncleNumberDelta = uncleNumber,
contractAddress = deltasToApply.any { delta -> delta.contractAddress ?: false },
topic = first.topic, partition = first.partition, offset = deltasToApply.maxBy { it -> it.offset }!!.offset
topic = first.topic, partition = first.partition,
offset = deltasToApply.maxBy { it -> it.offset }!!.offset
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@ import fund.cyber.cassandra.configuration.CassandraRepositoriesConfiguration
import fund.cyber.cassandra.configuration.keyspace
import fund.cyber.cassandra.migration.BlockchainMigrationSettings
import fund.cyber.cassandra.migration.MigrationSettings
import fund.cyber.search.configuration.*
import fund.cyber.search.configuration.CASSANDRA_HOSTS
import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_PORT
import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT
import fund.cyber.search.configuration.CHAIN
import fund.cyber.search.configuration.env
import fund.cyber.search.model.chains.BitcoinFamilyChain
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.*
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Condition
import org.springframework.context.annotation.ConditionContext
import org.springframework.context.annotation.Conditional
import org.springframework.context.annotation.Configuration
import org.springframework.core.type.AnnotatedTypeMetadata
import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories

Expand Down Expand Up @@ -41,4 +50,4 @@ private class BitcoinFamilyChainCondition : Condition {
val chain = context.environment.getProperty(CHAIN) ?: ""
return BitcoinFamilyChain.values().map(BitcoinFamilyChain::name).contains(chain)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ data class CqlBitcoinAddressTx(
val block_number: Long,
val ins: List<CqlBitcoinTxPreviewIO>,
val outs: List<CqlBitcoinTxPreviewIO>
) : CqlBitcoinItem
) : CqlBitcoinItem
Loading

0 comments on commit af77033

Please sign in to comment.