Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#3 btcd connector : use schemalles values
Browse files Browse the repository at this point in the history
  • Loading branch information
hleb-albau committed Sep 22, 2017
1 parent 8f43d66 commit e254a3c
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class BitcoinSourceConnectorTask : SourceTask() {
.map { (blockNumber, rawBlock) ->
SourceRecord(
sourcePartition, sourceOffset(blockNumber), IndexTopics.bitcoinSourceTopic,
STRING_SCHEMA, jsonSerializer.writeValueAsString(rawBlock)
null, jsonSerializer.writeValueAsString(rawBlock)
)

}
Expand Down
1 change: 1 addition & 0 deletions devops/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ services:

CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
Expand Down
11 changes: 11 additions & 0 deletions stream-processing/bitcoin/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apply plugin: "application"

dependencies {
compile(project(":core"))
compile(project(":connectors/model"))
compile("org.jetbrains.kotlin:kotlin-stdlib-jre8")
compile("org.apache.kafka:kafka-streams")
compile("org.apache.kafka:kafka-clients")
}

mainClassName = "fund.cyber.index.bitcoin.BitcoinBlockSplitterApplication"
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package fund.cyber.index.bitcoin

import fund.cyber.index.IndexTopics
import fund.cyber.index.btcd.BtcdBlock
import fund.cyber.node.kafka.JsonDeserializer
import fund.cyber.node.kafka.JsonSerializer
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.KafkaStreams
import org.slf4j.LoggerFactory

object BitcoinBlockSplitterApplication {

private val log = LoggerFactory.getLogger(BitcoinBlockSplitterApplication::class.java)

@JvmStatic
fun main(args: Array<String>) {

val streamsConfiguration = StreamConfiguration()
val builder = KStreamBuilder()

val bitcoinBlockSerde = Serdes.serdeFrom<BtcdBlock>(JsonSerializer<BtcdBlock>(), JsonDeserializer(BtcdBlock::class.java))

builder.stream<Any, BtcdBlock>(null, bitcoinBlockSerde, IndexTopics.bitcoinSourceTopic)
.flatMapValues { btcdBlock ->
val block = getBlockForStorage(btcdBlock)
listOf(block)
}

val streams = KafkaStreams(builder, streamsConfiguration.streamProperties())

streams.setUncaughtExceptionHandler { thread: Thread, throwable: Throwable ->
log.error("Error during splitting bitcoin block ", throwable)
}
streams.start()
Runtime.getRuntime().addShutdownHook(Thread(streams::close))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package fund.cyber.index.bitcoin

import fund.cyber.index.btcd.BtcdBlock
import fund.cyber.index.btcd.CoinbaseTransactionInput
import fund.cyber.index.btcd.RegularTransactionInput
import fund.cyber.index.btcd.Transaction
import fund.cyber.node.model.BitcoinBlock
import fund.cyber.node.model.BitcoinBlockTransaction
import fund.cyber.node.model.BitcoinBlockTransactionIO
import java.math.BigDecimal
import java.time.Instant

/*data class BitcoinBlockTransaction(
val fee: BigDecimal,
val lock_time: Instant,
val ins: List<BitcoinBlockTransactionIO>,
val outs: List<BitcoinBlockTransactionIO>
)*/


/*-------------------------------------------------------------------------------------*/
/*-------------------------------------------------------------------------------------*/
/*-------------------------------------------------------------------------------------*/
/*-------------------------------------------------------------------------------------*/
/*data class Transaction(
val txid: String,
val hex: String,
val version: Int,
val locktime: Long,
val vout: List<TransactionOutput>,
val vin: List<TransactionInput>
)
data class TransactionOutput(
val value: BigDecimal,
val n: Int,
val scriptPubKey: PubKeyScript
)
sealed class TransactionInput
data class CoinbaseTransactionInput(
val coinbase: String,
val sequence: Long,
val txinwitness: String = ""
) : TransactionInput()
data class RegularTransactionInput(
val txid: String,
val vout: Int,
val scriptSig: SignatureScript,
val sequence: Long,
val txinwitness: String = ""
) : TransactionInput()*/


fun getBlockForStorage(btcdBlock: BtcdBlock): BitcoinBlock {

return BitcoinBlock(
hash = btcdBlock.hash, height = btcdBlock.height, time = Instant.ofEpochMilli(btcdBlock.time),
nonce = btcdBlock.nonce, merkleroot = btcdBlock.merkleroot, size = btcdBlock.size,
version = btcdBlock.version, weight = btcdBlock.weight, difficulty = btcdBlock.difficulty,
bits = btcdBlock.bits, txs = getBlockTransactions(btcdBlock)
)
}

private fun getBlockTransactions(btcdBlock: BtcdBlock): List<BitcoinBlockTransaction> {

return btcdBlock.tx
.map { btcdTransaction ->
BitcoinBlockTransaction(
hash = btcdTransaction.hex, lock_time = Instant.ofEpochMilli(btcdTransaction.locktime),
ins = getTransactionIns(btcdTransaction),
/* todo*/
outs = emptyList(), fee = BigDecimal.ZERO
)
}
}

private fun getTransactionIns(btcdTransaction: Transaction): List<BitcoinBlockTransactionIO> {
val firstTransaction = btcdTransaction.vin[0]
return when (firstTransaction) {
/* todo*/
is RegularTransactionInput -> {
emptyList()
}
is CoinbaseTransactionInput -> {
emptyList()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package fund.cyber.index.bitcoin

import fund.cyber.node.common.env
import org.apache.kafka.streams.StreamsConfig
import java.util.*


class StreamConfiguration(
val kafkaServers: String = env("KAFKA_CONNECTION", "localhost:9092"),
val applicationId: String = "cyber.index.bitcoin.block.splitter"
) {
fun streamProperties(): Properties {
return Properties().apply {
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
}
}
}

0 comments on commit e254a3c

Please sign in to comment.