From f82828bdb624f6cc02699b99a349feafd7ab53e6 Mon Sep 17 00:00:00 2001 From: Vyacheslav Dobrynin Date: Thu, 6 Apr 2023 23:00:42 +0400 Subject: [PATCH] fix: join chunks --- build.gradle.kts | 4 +- .../bert/BertHyperParameterBenchmark.kt | 51 +++++++++---------- .../itmo/stand/command/SaveInBatchCommand.kt | 2 +- .../ru/itmo/stand/config/StandProperties.kt | 1 - .../service/bert/BertEmbeddingCalculator.kt | 1 + .../neighbours/indexing/VectorIndexBuilder.kt | 18 +++---- .../kotlin/ru/itmo/stand/util/Concurrency.kt | 5 +- src/main/resources/application.yml | 1 - .../stand/fixtures/StandPropertiesBuilder.kt | 2 - 9 files changed, 40 insertions(+), 45 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 1ab7268f..2a7eb390 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -67,12 +67,12 @@ tasks.withType { } jmh { - includes.set(listOf("BertHyperParameterBenchmark")) // include pattern (regular expression) for benchmarks to be executed + includes.set(listOf(".*")) // include pattern (regular expression) for benchmarks to be executed warmupIterations.set(2) // Number of warmup iterations to do iterations.set(2) // Number of measurement iterations to do fork.set(2) // How many times to forks a single benchmark. Use 0 to disable forking altogether zip64.set(true) // is used for big archives (more than 65535 entries) - resultsFile.set(project.file("${project.buildDir}/reports/jmh/results.txt")) // results file + resultsFile.set(project.file("${project.buildDir}/outputs/jmh/results.txt")) // results file } ktlint { diff --git a/src/jmh/kotlin/ru/itmo/stand/service/bert/BertHyperParameterBenchmark.kt b/src/jmh/kotlin/ru/itmo/stand/service/bert/BertHyperParameterBenchmark.kt index 7f43a2ee..ed0f9223 100644 --- a/src/jmh/kotlin/ru/itmo/stand/service/bert/BertHyperParameterBenchmark.kt +++ b/src/jmh/kotlin/ru/itmo/stand/service/bert/BertHyperParameterBenchmark.kt @@ -50,7 +50,6 @@ open class BertHyperParameterBenchmark { private val testContents = generateWindows() -/* @Benchmark fun singleThreadBenchmark_100(): Array { return singleThreadBenchmark(100) @@ -75,14 +74,12 @@ open class BertHyperParameterBenchmark { fun singleThreadBenchmark_2000(): Array { return singleThreadBenchmark(2000) } -*/ -// @Benchmark -// fun singleThreadBenchmark_5000(): Array { -// return singleThreadBenchmark(5000) -// } + @Benchmark + fun singleThreadBenchmark_5000(): Array { + return singleThreadBenchmark(5000) + } -/* @Benchmark fun singleThreadBenchmark_10_000(): Array { return singleThreadBenchmark(10_000) @@ -106,7 +103,7 @@ open class BertHyperParameterBenchmark { @Benchmark fun singleThreadBenchmark_50_000(): Array { return singleThreadBenchmark(50_000) - }*/ + } @Benchmark fun multithreadedBenchmark_4_5000(): Array { @@ -120,28 +117,30 @@ open class BertHyperParameterBenchmark { }.toTypedArray() } - private fun multithreadedBenchmark(batchSize: Int, numThreads: Int): Array = runBlocking(Dispatchers.Default) { - val counter = AtomicInteger(0) - val chan = Channel>(numThreads) - repeat(numThreads) { - launch { - val predictor = tinyModel.newPredictor() - for (data in chan) { - counter.incrementAndGet() - predictor.predict(data.toTypedArray()) + private fun multithreadedBenchmark(batchSize: Int, numThreads: Int): Array = + runBlocking(Dispatchers.Default) { + val counter = AtomicInteger(0) + val chan = Channel>(numThreads) + repeat(numThreads) { + launch { + val predictor = tinyModel.newPredictor() + for (data in chan) { + counter.incrementAndGet() + predictor.predict(data.toTypedArray()) + } + predictor.close() } - predictor.close() } - } - for (data in testContents.chunked(batchSize)) { - chan.send(data) - } - while (!chan.isEmpty) {} - chan.close() + for (data in testContents.chunked(batchSize)) { + chan.send(data) + } + while (!chan.isEmpty) { + } + chan.close() - arrayOf() - } + arrayOf() + } private fun generateWindows(count: Int = 50_000): List { val result = mutableListOf() diff --git a/src/main/kotlin/ru/itmo/stand/command/SaveInBatchCommand.kt b/src/main/kotlin/ru/itmo/stand/command/SaveInBatchCommand.kt index a0e342a9..14b13ce3 100644 --- a/src/main/kotlin/ru/itmo/stand/command/SaveInBatchCommand.kt +++ b/src/main/kotlin/ru/itmo/stand/command/SaveInBatchCommand.kt @@ -45,7 +45,7 @@ class SaveInBatchCommand( override fun run() { val contents = Paths.get(contentFile.path) - .bufferedReader(bufferSize = standProperties.app.fileLoadBufferSizeKb * 1024) + .bufferedReader() .lineSequence() val seconds = measureTimeSeconds { diff --git a/src/main/kotlin/ru/itmo/stand/config/StandProperties.kt b/src/main/kotlin/ru/itmo/stand/config/StandProperties.kt index 5f72732a..b7186c26 100644 --- a/src/main/kotlin/ru/itmo/stand/config/StandProperties.kt +++ b/src/main/kotlin/ru/itmo/stand/config/StandProperties.kt @@ -16,7 +16,6 @@ data class StandProperties @ConstructorBinding constructor( val basePath: String, val bertMultiToken: BertMultiToken, val neighboursAlgorithm: NeighboursAlgorithm, - val fileLoadBufferSizeKb: Int, ) data class BertMultiToken( diff --git a/src/main/kotlin/ru/itmo/stand/service/bert/BertEmbeddingCalculator.kt b/src/main/kotlin/ru/itmo/stand/service/bert/BertEmbeddingCalculator.kt index 85436678..1130b734 100644 --- a/src/main/kotlin/ru/itmo/stand/service/bert/BertEmbeddingCalculator.kt +++ b/src/main/kotlin/ru/itmo/stand/service/bert/BertEmbeddingCalculator.kt @@ -9,6 +9,7 @@ class BertEmbeddingCalculator( private val standProperties: StandProperties, ) { + // TODO: configure to return vector for middle token private val predictor by lazy { bertModelLoader.loadModel(standProperties.app.neighboursAlgorithm.bertModelType).newPredictor() } diff --git a/src/main/kotlin/ru/itmo/stand/service/impl/neighbours/indexing/VectorIndexBuilder.kt b/src/main/kotlin/ru/itmo/stand/service/impl/neighbours/indexing/VectorIndexBuilder.kt index f8241e15..3f76833b 100644 --- a/src/main/kotlin/ru/itmo/stand/service/impl/neighbours/indexing/VectorIndexBuilder.kt +++ b/src/main/kotlin/ru/itmo/stand/service/impl/neighbours/indexing/VectorIndexBuilder.kt @@ -23,9 +23,9 @@ class VectorIndexBuilder( private val log = LoggerFactory.getLogger(javaClass) fun index(windowedTokensFile: File) { + log.info("Starting vector indexing") val windowsByTokenPairs = readWindowsByTokenPairs(windowedTokensFile) - log.info("starting vector indexing") val counter = AtomicInteger(0) val clusterSizes = AtomicInteger(0) val windowsCount = AtomicInteger(0) @@ -37,11 +37,11 @@ class VectorIndexBuilder( counter.incrementAndGet() } - log.info("token count: ${counter.get()}") - log.info("cluster sizes: ${clusterSizes.get()}") - log.info("windows count: ${windowsCount.get()}") - log.info("mean windows per token: ${windowsCount.get().toDouble() / counter.get().toDouble()}") - log.info("mean cluster size is ${clusterSizes.get() / counter.get().toFloat()}") + log.info("Token count: ${counter.get()}") + log.info("Cluster sizes: ${clusterSizes.get()}") + log.info("Windows count: ${windowsCount.get()}") + log.info("Mean windows per token: ${windowsCount.get().toDouble() / counter.get().toDouble()}") + log.info("Mean cluster size is ${clusterSizes.get() / counter.get().toFloat()}") } private fun readWindowsByTokenPairs(windowedTokensFile: File) = windowedTokensFile @@ -53,7 +53,7 @@ class VectorIndexBuilder( val windows = tokenAndWindows[1] .split(WINDOWS_SEPARATOR) .filter { it.isNotBlank() } - .take(1000) + .take(1000) // TODO: configure this value token to windows.map { it.split(WINDOW_DOC_IDS_SEPARATOR).first() } } @@ -64,9 +64,9 @@ class VectorIndexBuilder( val doubleEmb = embeddings.toDoubleArray() - val clusterModel = XMeans.fit(doubleEmb, 8) + val clusterModel = XMeans.fit(doubleEmb, 8) // TODO: configure this value - log.info("{} got centroids {}", token.first, clusterModel.k) + log.info("{} got {} centroids", token.first, clusterModel.k) val centroids = clusterModel.centroids diff --git a/src/main/kotlin/ru/itmo/stand/util/Concurrency.kt b/src/main/kotlin/ru/itmo/stand/util/Concurrency.kt index 753b01f7..ae31bfc5 100644 --- a/src/main/kotlin/ru/itmo/stand/util/Concurrency.kt +++ b/src/main/kotlin/ru/itmo/stand/util/Concurrency.kt @@ -10,12 +10,11 @@ fun processParallel(data: Sequence, numWorkers: Int, log: Logger, action: data .onEachIndexed { index, _ -> if (index % 10 == 0) log.info("Elements processed: {}", index) } .chunked(numWorkers) - .mapIndexed { index, chunk -> + .forEach { chunk -> chunk.map { launch { action(it) } - } + }.joinAll() } - .forEach { it.joinAll() } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 28c5a962..e4155a98 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -16,7 +16,6 @@ stand: elasticsearch.host-and-port: localhost:9200 app: base-path: "." - file-load-buffer-size-kb: 512 neighbours-algorithm: token-batch-size: 5 bert-model-type: TINY diff --git a/src/test/kotlin/ru/itmo/stand/fixtures/StandPropertiesBuilder.kt b/src/test/kotlin/ru/itmo/stand/fixtures/StandPropertiesBuilder.kt index c6623271..f187b47c 100644 --- a/src/test/kotlin/ru/itmo/stand/fixtures/StandPropertiesBuilder.kt +++ b/src/test/kotlin/ru/itmo/stand/fixtures/StandPropertiesBuilder.kt @@ -12,13 +12,11 @@ fun standProperties( basePath: String = ".", bertMultiTokenBatchSize: Int = 5, neighboursAlgorithmBatchSize: Int = 5, - fileLoadBufferSizeMb: Int = 512, ) = StandProperties( ElasticsearchProperties(elkHostAndPort), ApplicationProperties( basePath, BertMultiToken(bertMultiTokenBatchSize), NeighboursAlgorithm(neighboursAlgorithmBatchSize, BertModelType.BASE, 500_000), - fileLoadBufferSizeMb, ), )