Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#166 BTC, ETH Supply Service
Browse files Browse the repository at this point in the history
--Basic implementation
  • Loading branch information
hleb-albau committed May 24, 2018
1 parent 0829542 commit cce6f40
Show file tree
Hide file tree
Showing 25 changed files with 593 additions and 54 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ buildscript {
junitVersion = "5.2.0"
junitPlatformVersion = "1.2.0"
mockitoVersion = "2.1.0"
mockitoKotlinVersion = "0.7.0"
mockitoKotlinVersion = "1.5.0"
assertjVersion = "3.9.0"

// metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.kafka.clients.producer.ProducerConfig

const val KAFKA_MAX_MESSAGE_SIZE_BYTES = 15728640
const val SESSION_TIMEOUT_MS_CONFIG = 30000
const val DEFAULT_POLL_TIMEOUT = 5000L

fun defaultConsumerConfig() = mutableMapOf<String,Any>(
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package fund.cyber.common.kafka

import org.apache.kafka.clients.admin.AdminClient
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.test.context.TestExecutionListeners
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener
import org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener
import org.springframework.test.context.support.DirtiesContextTestExecutionListener
import javax.annotation.PostConstruct


@Tag("kafka-integration")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand All @@ -18,8 +23,25 @@ import org.springframework.test.context.support.DirtiesContextTestExecutionListe
DirtiesContextBeforeModesTestExecutionListener::class,
DirtiesContextTestExecutionListener::class
])
abstract class BaseForKafkaIntegrationTest {
abstract class BaseKafkaIntegrationTest {

@Autowired
lateinit var embeddedKafka: KafkaEmbedded

lateinit var adminClient: AdminClient

@PostConstruct
fun postConstruct() {
adminClient = AdminClient.create(adminClientProperties(embeddedKafka.brokersAsString))!!
}
}


@EmbeddedKafka(
brokerProperties = [
"auto.create.topics.enable=false", "transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"
]
)
@TestPropertySource(properties = ["KAFKA_BROKERS:\${spring.embedded.kafka.brokers}"])
abstract class BaseKafkaIntegrationTestWithStartedKafka : BaseKafkaIntegrationTest()
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package fund.cyber.common.kafka

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.Config
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import java.util.*


fun AdminClient.getTopicConfig(topic: String): Config? {
val topicResource = ConfigResource(TOPIC, topic)
val configs = this.describeConfigs(listOf(topicResource))
return configs.all().get()[topicResource]
}

fun adminClientProperties(kafkaBrokers: String) = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fund.cyber.common.kafka.reader

import fund.cyber.common.kafka.BaseForKafkaIntegrationTest
import fund.cyber.common.kafka.BaseKafkaIntegrationTest
import fund.cyber.common.kafka.SinglePartitionTopicDataPresentLatch
import fund.cyber.common.kafka.sendRecords
import org.junit.jupiter.api.*
Expand All @@ -17,7 +17,7 @@ const val EXISTING_TOPIC_WITH_RECORDS_LACK = "EXISTING_EMPTY_TOPIC"
]
)
@DisplayName("Single-partitioned topic lack of records reader test")
class SinglePartitionLackOfRecordsReaderTest : BaseForKafkaIntegrationTest() {
class SinglePartitionLackOfRecordsReaderTest : BaseKafkaIntegrationTest() {


private val itemsCount = 4
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fund.cyber.common.kafka.reader

import fund.cyber.common.kafka.BaseForKafkaIntegrationTest
import fund.cyber.common.kafka.BaseKafkaIntegrationTest
import fund.cyber.common.kafka.SinglePartitionTopicDataPresentLatch
import fund.cyber.common.kafka.sendRecordsInTransaction
import org.junit.jupiter.api.*
Expand All @@ -16,7 +16,7 @@ const val MULTIPLE_TRANSACTION_RECORD_TOPIC = "MULTIPLE_TRANSACTION_RECORD_TOPIC
]
)
@DisplayName("Single-partitioned topic last items reader test")
class SinglePartitionMultipleTransactionRecordReaderTest : BaseForKafkaIntegrationTest() {
class SinglePartitionMultipleTransactionRecordReaderTest : BaseKafkaIntegrationTest() {

private val itemsCount = 4

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fund.cyber.common.kafka.reader

import fund.cyber.common.kafka.BaseForKafkaIntegrationTest
import fund.cyber.common.kafka.BaseKafkaIntegrationTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
Expand All @@ -12,15 +12,15 @@ const val EXISTING_EMPTY_TOPIC = "EXISTING_EMPTY_TOPIC"

@EmbeddedKafka(topics = [EXISTING_EMPTY_TOPIC], partitions = 1)
@DisplayName("Single-partitioned topic without items reader tests")
class SinglePartitionNonRecordsReaderTest : BaseForKafkaIntegrationTest() {
class SinglePartitionNonRecordsReaderTest : BaseKafkaIntegrationTest() {

@Test
@DisplayName("Test non-existing topic")
fun testNonExistingTopic() {

val reader = SinglePartitionTopicLastItemsReader(
kafkaBrokers = embeddedKafka.brokersAsString, topic = NON_EXISTING_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
kafkaBrokers = embeddedKafka.brokersAsString, topic = NON_EXISTING_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
)

val records = reader.readLastRecords(1)
Expand All @@ -32,8 +32,8 @@ class SinglePartitionNonRecordsReaderTest : BaseForKafkaIntegrationTest() {
fun testEmptyTopic() {

val reader = SinglePartitionTopicLastItemsReader(
kafkaBrokers = embeddedKafka.brokersAsString, topic = EXISTING_EMPTY_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
kafkaBrokers = embeddedKafka.brokersAsString, topic = EXISTING_EMPTY_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
)

val records = reader.readLastRecords(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package fund.cyber.common.kafka.reader

import fund.cyber.common.kafka.BaseForKafkaIntegrationTest
import fund.cyber.common.kafka.BaseKafkaIntegrationTest
import fund.cyber.common.kafka.SinglePartitionTopicDataPresentLatch
import fund.cyber.common.kafka.sendRecordsInTransaction
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.springframework.kafka.test.context.EmbeddedKafka


const val SINGLE_TRANSACTION_RECORD_TOPIC = "SINGLE_TRANSACTION_RECORD_TOPIC"

@EmbeddedKafka(
partitions = 1, topics = [SINGLE_TRANSACTION_RECORD_TOPIC],
brokerProperties = [
"auto.create.topics.enable=false", "transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"
]
partitions = 1, topics = [SINGLE_TRANSACTION_RECORD_TOPIC],
brokerProperties = [
"auto.create.topics.enable=false", "transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"
]
)
@DisplayName("Single-partitioned topic last item reader test")
class SinglePartitionSingeTransactionRecordReaderTest : BaseForKafkaIntegrationTest() {
class SinglePartitionSingeTransactionRecordReaderTest : BaseKafkaIntegrationTest() {


@BeforeEach
fun produceRecords() {

sendRecordsInTransaction(
embeddedKafka.brokersAsString, SINGLE_TRANSACTION_RECORD_TOPIC, listOf("key" to 1)
embeddedKafka.brokersAsString, SINGLE_TRANSACTION_RECORD_TOPIC, listOf("key" to 1)
)

SinglePartitionTopicDataPresentLatch(
embeddedKafka.brokersAsString, SINGLE_TRANSACTION_RECORD_TOPIC, String::class.java, Int::class.java
embeddedKafka.brokersAsString, SINGLE_TRANSACTION_RECORD_TOPIC, String::class.java, Int::class.java
).await()
}

Expand All @@ -38,8 +41,8 @@ class SinglePartitionSingeTransactionRecordReaderTest : BaseForKafkaIntegrationT


val reader = SinglePartitionTopicLastItemsReader(
kafkaBrokers = embeddedKafka.brokersAsString, topic = SINGLE_TRANSACTION_RECORD_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
kafkaBrokers = embeddedKafka.brokersAsString, topic = SINGLE_TRANSACTION_RECORD_TOPIC,
keyClass = String::class.java, valueClass = Int::class.java
)
val records = reader.readLastRecords(1)

Expand Down
1 change: 1 addition & 0 deletions common/src/main/kotlin/fund/cyber/common/Math.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ inline fun Iterable<BigDecimal>.sum(): BigDecimal {

inline fun String.hexToLong(): Long = java.lang.Long.decode(this)

@Suppress("MagicNumber") val MINUS_ONE = BigDecimal.ONE.negate()!!
@Suppress("MagicNumber") val decimal8 = BigDecimal(8)
@Suppress("MagicNumber") val decimal32 = BigDecimal(32)
const val DECIMAL_SCALE = 18
Expand Down
7 changes: 2 additions & 5 deletions common/src/main/kotlin/fund/cyber/search/configuration/Env.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package fund.cyber.search.configuration

const val DEV_ENVIRONMENT = "DEV_ENVIRONMENT"

const val CHAIN = "CHAIN"
const val CHAIN_FAMILY = "CHAIN_FAMILY"
const val CHAIN_NAME = "CHAIN_NAME"
Expand All @@ -10,9 +8,6 @@ const val CHAIN_NODE_URL = "CHAIN_NODE_URL"
const val KAFKA_BROKERS = "KAFKA_BROKERS"
const val KAFKA_BROKERS_DEFAULT = "localhost:9092"

const val KAFKA_TRANSACTION_BATCH = "KAFKA_TRANSACTION_BATCH"
const val KAFKA_TRANSACTION_BATCH_DEFAULT = 1

const val CASSANDRA_HOSTS = "CASSANDRA_HOSTS"
const val CASSANDRA_HOSTS_DEFAULT = "localhost"

Expand All @@ -38,6 +33,8 @@ const val STACK_CACHE_SIZE_DEFAULT = 100

const val WITH_MEMPOOL = "WITH_MEMPOOL"

const val GENESIS_SUPPLY = "GENESIS_SUPPLY"

const val BITCOIN_TX_OUTS_CACHE_HEAP_SIZE = "BITCOIN_TX_OUTS_CACHE_HEAP_SIZE"
const val BITCOIN_TX_OUTS_CACHE_HEAP_SIZE_DEFAULT = 4

Expand Down
44 changes: 22 additions & 22 deletions common/src/main/kotlin/fund/cyber/search/model/ethereum/Block.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ const val ETHEREUM_CLASSIC_REWARD_CHANGED_BLOCK_NUMBER = 5000000
const val ETHEREUM_REWARD_CHANGED_BLOCK_NUMBER = 4370000

data class EthereumBlock(
override val number: Long, //parsed from hex
val hash: String,
val parentHash: String,
val timestamp: Instant,
val sha3Uncles: String,
val logsBloom: String,
val transactionsRoot: String,
val stateRoot: String,
val receiptsRoot: String,
val minerContractHash: String,
val nonce: Long, //parsed from hex
val difficulty: BigInteger,
val totalDifficulty: BigInteger, //parsed from hex
val extraData: String,
val size: Long, //parsed from hex
val gasLimit: Long, //parsed from hex
val gasUsed: Long, //parsed from hex
val txNumber: Int,
val uncles: List<String> = emptyList(),
val blockReward: BigDecimal,
val unclesReward: BigDecimal,
val txFees: BigDecimal
override val number: Long, //parsed from hex
val hash: String,
val parentHash: String,
val timestamp: Instant,
val sha3Uncles: String,
val logsBloom: String,
val transactionsRoot: String,
val stateRoot: String,
val receiptsRoot: String,
val minerContractHash: String,
val nonce: Long, //parsed from hex
val difficulty: BigInteger,
val totalDifficulty: BigInteger, //parsed from hex
val extraData: String,
val size: Long, //parsed from hex
val gasLimit: Long, //parsed from hex
val gasUsed: Long, //parsed from hex
val txNumber: Int,
val uncles: List<String> = emptyList(),
val blockReward: BigDecimal,
val unclesReward: BigDecimal, //including uncles reward, todo rename
val txFees: BigDecimal
) : BlockEntity

//todo: 1) add properly support of new classic fork. 2) add support of custom reward functions in forks
Expand Down
14 changes: 14 additions & 0 deletions common/src/main/kotlin/fund/cyber/search/model/ethereum/Supply.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package fund.cyber.search.model.ethereum

import java.math.BigDecimal


data class EthereumSupply(
val blockNumber: Long,
val uncleNumber: Long,
val totalSupply: BigDecimal,
val genesisSupply: BigDecimal,
val miningBlocksSupply: BigDecimal,
val miningUnclesSupply: BigDecimal,
val includingUnclesSupply: BigDecimal
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package fund.cyber.search.model.events

import fund.cyber.search.model.chains.Chain
import fund.cyber.search.model.chains.ChainInfo


val Chain.txPumpTopic: String get() = name + "_TX_PUMP"
val Chain.blockPumpTopic: String get() = name + "_BLOCK_PUMP"
val Chain.unclePumpTopic: String get() = name + "_UNCLE_PUMP"


val ChainInfo.txPumpTopic: String get() = fullName + "_TX_PUMP"
val ChainInfo.blockPumpTopic: String get() = fullName + "_BLOCK_PUMP"
val ChainInfo.unclePumpTopic: String get() = fullName + "_UNCLE_PUMP"
val ChainInfo.supplyTopic: String get() = fullName + "_SUPPLY"
2 changes: 1 addition & 1 deletion docs/api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM swaggerapi/swagger-ui:v3.12.1
FROM swaggerapi/swagger-ui:v3.14.2

COPY docs/api/common.v1.yaml /usr/share/nginx/html/
COPY docs/api/search.v1.yaml /usr/share/nginx/html/
Expand Down
2 changes: 1 addition & 1 deletion docs/api/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Build and Run locally
```bash
docker build -t build/raw-api -f ./docs/Dockerfile ./ && docker run -p 8080:8080 build/raw-api
docker build -t build/raw-api -f ./docs/api/Dockerfile ./ && docker run -p 8080:8080 build/raw-api
```
Loading

0 comments on commit cce6f40

Please sign in to comment.