Skip to content

Commit

Permalink
Tweak subscription emit logging
Browse files Browse the repository at this point in the history
  • Loading branch information
anssip committed Nov 27, 2023
1 parent e661ab5 commit f785562
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
4 changes: 2 additions & 2 deletions application/src/main/resources/application-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ management:
exposure:
include: health,metrics,info
logging:
root: DEBUG
root: info
level:
com:
npd:
Expand All @@ -49,7 +49,7 @@ logging:
http: error
org:
springframework:
web: DEBUG
web: info
http: error
graphql: error
jpa: info
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.npd.betting.controllers

import com.npd.betting.services.importer.EventImporter
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.time.Duration
Expand All @@ -10,6 +13,7 @@ class AccumulatingSink<T>(
private val clearThreshold: Int = 1000 // Adjust the threshold as needed

) {
val logger: Logger = LoggerFactory.getLogger(EventImporter::class.java)
private val sinkProcessor: Sinks.Many<List<T>> = Sinks.many().multicast().onBackpressureBuffer<List<T>>()
private val accumulatedMessages = mutableListOf<T>()
private val emittedMessages = mutableSetOf<T>()
Expand Down Expand Up @@ -41,6 +45,7 @@ class AccumulatingSink<T>(
accumulatedMessages.add(message)
if (accumulatedMessages.size >= bufferSize) {
val uniqueMessages = accumulatedMessages.filter { it !in emittedMessages }
logger.info("Emitting ${uniqueMessages.size} messages")
emitUniqueMessages(uniqueMessages)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class EventService(
existing.isLive = eventData.isLive()
existing.completed = eventData.completed ?: existing.completed
val saved = eventRepository.save(existing)
println("Event ${saved.id} is now ${if (saved.isLive) "live" else "not live"}. Emitting...")
logger.info("Event ${saved.id} is now ${if (saved.isLive) "live" else "not live"}. Emitting...")
eventStatusUpdatesSink.emit(saved)
} else {
logger.debug("Event ${eventData.id} already exists, skipping...")
Expand Down Expand Up @@ -151,7 +151,7 @@ class EventService(
if (existingMarketOption != null && existingMarketOption.odds != BigDecimal(marketOptionData.price)) {
existingMarketOption.odds = BigDecimal(marketOptionData.price)
existingMarketOption.lastUpdated = Timestamp(marketData.last_update * 1000)

logger.info("Event ${event.id}, market ${marketData.key}, source: ${existingMarket.source}, option ${marketOptionData.name} has been updated")
marketOptionSink.emit(existingMarketOption)
} else {
// is this a valid case?
Expand Down Expand Up @@ -237,6 +237,7 @@ class EventService(
}

fun emitEventStatusUpdate(event: Event) {
logger.info("Emitting event status update for event ${event.id}")
eventStatusUpdatesSink.emit(event)
}

Expand Down

0 comments on commit f785562

Please sign in to comment.