Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Chain pump fixed #39

Merged
merged 18 commits into from
Dec 28, 2017
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package fund.cyber.address

import fund.cyber.address.bitcoin.getBitcoinAddressesUpdateProcessParameters
import fund.cyber.address.common.AddressesUpdateProcessParameters
import fund.cyber.address.common.ApplyingAddressDeltasProcess
import fund.cyber.address.common.ConvertEntityToAddressDeltaProcess
import fund.cyber.address.common.*
import fund.cyber.address.ethereum.getEthereumAddressesUpdateProcessParameters
import fund.cyber.cassandra.CassandraService
import fund.cyber.cassandra.repository.BitcoinKeyspaceRepository
import fund.cyber.cassandra.repository.EthereumKeyspaceRepository
import fund.cyber.node.common.Chain
import fund.cyber.node.common.Chain.*
import fund.cyber.node.common.env
import fund.cyber.node.kafka.KafkaConsumerRunner
import fund.cyber.node.kafka.KafkaEvent
import fund.cyber.node.kafka.PumpEvent
import org.ehcache.CacheManager
import org.ehcache.config.builders.CacheManagerBuilder
import org.ehcache.xml.XmlConfiguration
Expand All @@ -31,24 +31,17 @@ object AddressServiceApplication {

private fun run() {

val chainsToPump: List<Chain> = env("CS_CHAINS_TO_PUMP", "")
.split(",").map(String::trim).filter(String::isNotEmpty).map(Chain::valueOf)

val cassandraService = CassandraService(ServiceConfiguration.cassandraServers, ServiceConfiguration.cassandraPort)
val cacheManager = getCacheManager()

val bitcoinProcesses = listOf(BITCOIN, BITCOIN_CASH).map { chain ->
val repository = cassandraService.getChainRepository(chain) as BitcoinKeyspaceRepository
val parameters = getBitcoinAddressesUpdateProcessParameters(chain, repository, cacheManager)
return@map toUpdateAddressesStateProcess(parameters)
}.flatten()

val ethereumProcesses = listOf(ETHEREUM, ETHEREUM_CLASSIC).map { chain ->
val repository = cassandraService.getChainRepository(chain) as EthereumKeyspaceRepository
val parameters = getEthereumAddressesUpdateProcessParameters(chain, repository, cacheManager)
return@map toUpdateAddressesStateProcess(parameters)
}.flatten()

val processes = bitcoinProcesses + ethereumProcesses
val processes = chainsToPump
.map { chain -> chainUpdateAddressesStateProcesses(chain, cassandraService, cacheManager) }
.flatten()

val executor = Executors.newFixedThreadPool(processes.size * 2)
val executor = Executors.newFixedThreadPool(processes.size * 3)
processes.forEach { process -> executor.execute(process) }

Runtime.getRuntime().addShutdownHook(object : Thread() {
Expand All @@ -61,15 +54,38 @@ object AddressServiceApplication {
})
}

private fun toUpdateAddressesStateProcess(processParameters: AddressesUpdateProcessParameters)
: List<KafkaConsumerRunner<KafkaEvent, out Any>> {
private fun chainUpdateAddressesStateProcesses(
chain: Chain, cassandraService: CassandraService, cacheManager: CacheManager
): List<KafkaConsumerRunner<PumpEvent, out Any>> {

return when (chain) {
BITCOIN, BITCOIN_CASH -> {
val repository = cassandraService.getChainRepository(chain) as BitcoinKeyspaceRepository
val parameters = getBitcoinAddressesUpdateProcessParameters(chain, repository, cacheManager)
toUpdateAddressesStateProcess(parameters)
}
ETHEREUM, ETHEREUM_CLASSIC -> {
val repository = cassandraService.getChainRepository(chain) as EthereumKeyspaceRepository
val parameters = getEthereumAddressesUpdateProcessParameters(chain, repository, cacheManager)
toUpdateAddressesStateProcess(parameters)
}
else -> emptyList()
}
}

@Suppress("UNCHECKED_CAST")
private fun toUpdateAddressesStateProcess(processParameters: AddressesUpdateProcessParameters<out AddressDelta>)
: List<KafkaConsumerRunner<PumpEvent, out Any>> {

val chain = processParameters.chain

val deltasEmitingProcesses = processParameters.convertEntityToAddressDeltaProcessesParameters
.map { parameters -> ConvertEntityToAddressDeltaProcess(chain, parameters) }

val applyDeltasProcess = ApplyingAddressDeltasProcess(chain, processParameters.applyAddressDeltaFunction)
val applyDeltasProcess = ApplyingAddressDeltasProcess(
chain, processParameters.addressDeltaClassType as Class<AddressDelta>,
processParameters.applyAddressDeltaFunction as ApplyAddressDeltaFunction<AddressDelta>
)

return deltasEmitingProcesses + applyDeltasProcess
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ class BitcoinTransactionToAddressDeltaFunction : ConvertItemToAddressDeltaFuncti

val addressesDeltasByIns = tx.ins.flatMap { input ->
input.addresses.map { address ->
AddressDelta(address, BigDecimal(input.amount).negate(), blockNumber, TRANSACTION)
AddressDelta(TRANSACTION, address, blockNumber, BigDecimal(input.amount).negate(), txNumberDelta = 1)
}
}

val addressesDeltasByOuts = tx.outs.flatMap { output ->
output.addresses.map { address ->
AddressDelta(address, BigDecimal(output.amount), blockNumber, TRANSACTION)
AddressDelta(TRANSACTION, address, blockNumber, BigDecimal(output.amount), txNumberDelta = 1)
}
}
return addressesDeltasByIns + addressesDeltasByOuts
Expand All @@ -36,7 +36,7 @@ class BitcoinTransactionToAddressDeltaFunction : ConvertItemToAddressDeltaFuncti
class ApplyBitcoinAddressDeltaFunction(
private val repository: BitcoinKeyspaceRepository,
private val addressCache: Cache<String, BitcoinAddress>
) : ApplyAddressDeltaFunction {
) : ApplyAddressDeltaFunction<AddressDelta> {

override fun invoke(addressDelta: AddressDelta) {

Expand All @@ -55,19 +55,19 @@ private fun nonExistingAddressFromDelta(delta: AddressDelta): BitcoinAddress {

return BitcoinAddress(
id = delta.address, confirmed_tx_number = 1,
confirmed_balance = delta.delta.toString(), confirmed_total_received = delta.delta
confirmed_balance = delta.balanceDelta.toString(), confirmed_total_received = delta.balanceDelta
)
}


private fun updatedAddressByDelta(address: BitcoinAddress, addressDelta: AddressDelta): BitcoinAddress {

val sign = addressDelta.delta.signum()
val sign = addressDelta.balanceDelta.signum()

val totalReceived =
if (sign > 0) address.confirmed_total_received + addressDelta.delta else address.confirmed_total_received
if (sign > 0) address.confirmed_total_received + addressDelta.balanceDelta else address.confirmed_total_received

val newBalance = (BigDecimal(address.confirmed_balance) + addressDelta.delta).toString()
val newBalance = (BigDecimal(address.confirmed_balance) + addressDelta.balanceDelta).toString()

return BitcoinAddress(
id = address.id, confirmed_tx_number = address.confirmed_tx_number + 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package fund.cyber.address.bitcoin

import fund.cyber.address.common.AddressDelta
import fund.cyber.address.common.AddressesUpdateProcessParameters
import fund.cyber.address.common.ConvertEntityToAddressDeltaProcessParameters
import fund.cyber.address.common.addressCache
Expand All @@ -14,7 +15,7 @@ import org.ehcache.CacheManager

fun getBitcoinAddressesUpdateProcessParameters(
chain: Chain, keyspaceRepository: BitcoinKeyspaceRepository, cacheManager: CacheManager
): AddressesUpdateProcessParameters {
): AddressesUpdateProcessParameters<AddressDelta> {

val transactionToAddressDeltaProcessesParameters = ConvertEntityToAddressDeltaProcessParameters(
entityType = BitcoinTransaction::class.java,
Expand All @@ -26,7 +27,8 @@ fun getBitcoinAddressesUpdateProcessParameters(
val applyAddressDeltaFunction = ApplyBitcoinAddressDeltaFunction(keyspaceRepository, addressCache)

return AddressesUpdateProcessParameters(
chain = chain, applyAddressDeltaFunction = applyAddressDeltaFunction,
chain = chain, addressDeltaClassType = AddressDelta::class.java,
applyAddressDeltaFunction = applyAddressDeltaFunction,
convertEntityToAddressDeltaProcessesParameters = listOf(transactionToAddressDeltaProcessesParameters)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ import fund.cyber.node.common.Chain
import fund.cyber.node.common.ChainEntity
import java.math.BigDecimal

data class AddressDelta(
open class AddressDelta(
val source: ChainEntity,
val address: String,
val delta: BigDecimal,
val blockNumber: Long,
val source: ChainEntity
)
val balanceDelta: BigDecimal,
val txNumberDelta: Int
) {

open fun reverseDelta(): AddressDelta = AddressDelta(
source = source, address = address, blockNumber = blockNumber,
balanceDelta = -balanceDelta, txNumberDelta = -txNumberDelta
)
}


val Chain.addressDeltaTopic: String get() = name + "_ADDRESS_DELTA"
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ data class ConvertEntityToAddressDeltaProcessParameters<T : CyberSearchItem>(
)


data class AddressesUpdateProcessParameters(
data class AddressesUpdateProcessParameters<D : AddressDelta>(
val chain: Chain,
val addressDeltaClassType: Class<D>,
val convertEntityToAddressDeltaProcessesParameters: List<ConvertEntityToAddressDeltaProcessParameters<*>>,
val applyAddressDeltaFunction: ApplyAddressDeltaFunction
val applyAddressDeltaFunction: ApplyAddressDeltaFunction<D>
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import fund.cyber.node.common.Chain
import fund.cyber.node.common.ChainEntity
import fund.cyber.node.kafka.ExactlyOnceKafkaConsumerRunner
import fund.cyber.node.kafka.JsonDeserializer
import fund.cyber.node.kafka.KafkaEvent
import fund.cyber.node.kafka.PumpEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
Expand All @@ -14,33 +14,35 @@ import java.util.*

val Chain.addressCache: String get() = name + "_" + ChainEntity.ADDRESS

typealias ApplyAddressDeltaFunction = (AddressDelta) -> Unit
typealias ApplyAddressDeltaFunction<D> = (D) -> Unit

private val log = LoggerFactory.getLogger(ApplyingAddressDeltasProcess::class.java)!!

class ApplyingAddressDeltasProcess(

//todo apply CAS and || execution via partitions
class ApplyingAddressDeltasProcess<D : AddressDelta>(
private val chain: Chain,
private val updateAddressStateByDeltaFunction: ApplyAddressDeltaFunction
) : ExactlyOnceKafkaConsumerRunner<KafkaEvent, AddressDelta>(listOf(chain.addressDeltaTopic)) {
private val addressDeltaClassType: Class<D>,
private val updateAddressStateByDeltaFunction: ApplyAddressDeltaFunction<D>
) : ExactlyOnceKafkaConsumerRunner<PumpEvent, D>(listOf(chain.addressDeltaTopic)) {

private var lastProcessedItemBlock = -1L

private val kafkaProperties = Properties().apply {
put("bootstrap.servers", ServiceConfiguration.kafkaBrokers)
put("group.id", "bitcoin-address-updates-persistence-process")
put("group.id", "$chain-apply-address-deltas-process")
put("enable.auto.commit", false)
put("isolation.level", "read_committed")
put("auto.offset.reset", "earliest")
}

override val consumer by lazy {
KafkaConsumer<KafkaEvent, AddressDelta>(
kafkaProperties,
JsonDeserializer(KafkaEvent::class.java), JsonDeserializer(AddressDelta::class.java)
KafkaConsumer<PumpEvent, D>(
kafkaProperties, JsonDeserializer(PumpEvent::class.java), JsonDeserializer(addressDeltaClassType)
)
}

override fun processRecord(record: ConsumerRecord<KafkaEvent, AddressDelta>) {
override fun processRecord(record: ConsumerRecord<PumpEvent, D>) {

val addressDelta = record.value()

Expand All @@ -54,7 +56,7 @@ class ApplyingAddressDeltasProcess(
try {
updateAddressStateByDeltaFunction(addressDelta)
} catch (e: Exception) {
log.error("Calculating $chain addresses deltas for address ${addressDelta.address} finished with error", e)
log.error("Applying $chain addresses deltas for address ${addressDelta.address} finished with error", e)
Runtime.getRuntime().exit(-1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import fund.cyber.node.common.awaitAll
import fund.cyber.node.kafka.JsonDeserializer
import fund.cyber.node.kafka.JsonSerializer
import fund.cyber.node.kafka.KafkaConsumerRunner
import fund.cyber.node.kafka.KafkaEvent
import fund.cyber.node.kafka.PumpEvent
import fund.cyber.node.model.CyberSearchItem
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
Expand All @@ -25,7 +25,7 @@ class ConvertEntityToAddressDeltaProcess<T : CyberSearchItem>(
private val chain: Chain,
private val parameters: ConvertEntityToAddressDeltaProcessParameters<T>,
private val topic: String = parameters.inputTopic
) : KafkaConsumerRunner<KafkaEvent, T>(listOf(topic)) {
) : KafkaConsumerRunner<PumpEvent, T>(listOf(topic)) {

private var lastProcessedItemBlock = -1L

Expand All @@ -50,9 +50,9 @@ class ConvertEntityToAddressDeltaProcess<T : CyberSearchItem>(
}

override val consumer by lazy {
KafkaConsumer<KafkaEvent, T>(
KafkaConsumer<PumpEvent, T>(
consumerProperties,
JsonDeserializer(KafkaEvent::class.java), JsonDeserializer(parameters.entityType)
JsonDeserializer(PumpEvent::class.java), JsonDeserializer(parameters.entityType)
)
}

Expand All @@ -62,13 +62,18 @@ class ConvertEntityToAddressDeltaProcess<T : CyberSearchItem>(
}


override fun processRecord(partition: TopicPartition, record: ConsumerRecord<KafkaEvent, T>) {
override fun processRecord(partition: TopicPartition, record: ConsumerRecord<PumpEvent, T>) {

val item = record.value() as T
val deltas = parameters.convertEntityToAddressDeltaFunction(item)
val event = record.key()

val deltas = parameters.convertEntityToAddressDeltaFunction(item).let { delta ->
if (event != PumpEvent.DROPPED_BLOCK) delta else delta.reversed()
}

if (deltas.isEmpty()) return


val blockNumber = deltas.first().blockNumber

if (lastProcessedItemBlock != blockNumber) {
Expand Down
Loading