Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#3 add cassandra for query input transactions, define transactions st…
Browse files Browse the repository at this point in the history
…ream
  • Loading branch information
hleb-albau committed Sep 24, 2017
1 parent f57f12d commit e7b8c26
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 4 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ buildscript {
jacksonVersion = "2.8.9"
undertowVersion = "1.4.20.Final"
kafkaVersion = "0.11.0.1"
cassandraConfluentVersion = "3.3.0"
web3jVersion = "2.3.0"
apacheHttpClientVersion = "4.1.3"

Expand Down Expand Up @@ -84,6 +85,9 @@ subprojects {
dependency("org.mockito:mockito-core:$mockitoVersion")
dependency("com.nhaarman:mockito-kotlin:$mockitoKotlinVersion")

dependency("com.datastax.cassandra:cassandra-driver-core:$cassandraConfluentVersion")
dependency("com.datastax.cassandra:cassandra-driver-mapping:$cassandraConfluentVersion")
dependency("com.datastax.cassandra:cassandra-driver-extras:$cassandraConfluentVersion")

dependency("org.web3j:core:$web3jVersion")
}
Expand Down
2 changes: 1 addition & 1 deletion devops/elassandra/bootstrap.cql
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ CREATE TYPE IF NOT EXISTS blockchains.bitcoin_tx_in (
address text,
amount decimal,
asm text,
tx_hash text,
tx_id text,
tx_out tinyint
);

Expand Down
2 changes: 2 additions & 0 deletions stream-processing/bitcoin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ dependencies {
compile("org.jetbrains.kotlin:kotlin-stdlib-jre8")
compile("org.apache.kafka:kafka-streams")
compile("org.apache.kafka:kafka-clients")
compile("com.datastax.cassandra:cassandra-driver-core")
compile("com.datastax.cassandra:cassandra-driver-mapping")
}

mainClassName = "fund.cyber.index.bitcoin.BitcoinBlockSplitterApplication"
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package fund.cyber.index.bitcoin

import com.datastax.driver.core.Cluster
import fund.cyber.index.IndexTopics
import fund.cyber.index.btcd.BtcdBlock
import fund.cyber.index.btcd.BtcdRegularTransactionInput
import fund.cyber.node.kafka.JsonDeserializer
import fund.cyber.node.kafka.JsonSerializer
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.KafkaStreams
import org.slf4j.LoggerFactory
import fund.cyber.node.model.BitcoinTransaction
import com.datastax.driver.mapping.MappingManager
import fund.cyber.index.bitcoin.converter.BitcoinTransactionConverter


object BitcoinBlockSplitterApplication {

Expand All @@ -17,22 +23,53 @@ object BitcoinBlockSplitterApplication {
fun main(args: Array<String>) {

val streamsConfiguration = StreamConfiguration()

val cassandra = Cluster.builder()
.addContactPoint(streamsConfiguration.cassandraServers)
.build()

val transactionConverter = BitcoinTransactionConverter()
val builder = KStreamBuilder()

val bitcoinBlockSerde = Serdes.serdeFrom<BtcdBlock>(JsonSerializer<BtcdBlock>(), JsonDeserializer(BtcdBlock::class.java))

builder.stream<Any, BtcdBlock>(null, bitcoinBlockSerde, IndexTopics.bitcoinSourceTopic)
.flatMapValues { btcdBlock ->
val block = getBlockForStorage(btcdBlock)
listOf(block)

val inputTransactions = loadInputTransactions(btcdBlock, cassandra)
val newTransactionsByBlock = transactionConverter.btcdTransactionsToDao(btcdBlock, inputTransactions)
listOf(newTransactionsByBlock)
}
.to("bitcoin_tx")

val streams = KafkaStreams(builder, streamsConfiguration.streamProperties())

streams.setUncaughtExceptionHandler { thread: Thread, throwable: Throwable ->
log.error("Error during splitting bitcoin block ", throwable)
}
streams.start()
Runtime.getRuntime().addShutdownHook(Thread(streams::close))
Runtime.getRuntime().addShutdownHook(Thread {
streams.close()
cassandra.close()
})
}

private fun loadInputTransactions(btcdBlock: BtcdBlock, cassandra: Cluster): Map<String, BitcoinTransaction> {

val incomingNonCoinbaseTransactionsIds = btcdBlock.rawtx
.flatMap { transaction -> transaction.vin }
.filter { txInput -> txInput is BtcdRegularTransactionInput }
.map { txInput -> (txInput as BtcdRegularTransactionInput).txid }
.joinToString(separator = ",")

if (incomingNonCoinbaseTransactionsIds.isEmpty()) return emptyMap()

val session = cassandra.connect("blockchains")
val manager = MappingManager(session)
val mapper = manager.mapper(BitcoinTransaction::class.java)

val resultSet = session.execute("SELECT * FROM bitcoin_tx WHERE hash in ($incomingNonCoinbaseTransactionsIds)")

return mapper.map(resultSet).associateBy { tx -> tx.txId }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.util.*

class StreamConfiguration(
val kafkaServers: String = env("KAFKA_CONNECTION", "localhost:9092"),
val cassandraServers: String = env("CASSANDRA_CONNECTION", "localhost:9042"),
val applicationId: String = "cyber.index.bitcoin.block.splitter"
) {
fun streamProperties(): Properties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import java.time.Instant

class BitcoinTransactionConverter {

fun btcdTransactionsToDao(
btcdBlock: BtcdBlock, inputDaoTransactionById: Map<String, BitcoinTransaction>): List<BitcoinTransaction> {

return btcdBlock.rawtx
.map { btcdTransaction -> btcdTransactionToDao(btcdTransaction, btcdBlock, inputDaoTransactionById) }
}


fun btcdTransactionToDao(
btcdTransaction: BtcdTransaction, btcdBlock: BtcdBlock,
Expand Down

0 comments on commit e7b8c26

Please sign in to comment.