Skip to content

Commit

Permalink
Add Importer command #dockerpush #latest
Browse files Browse the repository at this point in the history
  • Loading branch information
smaugfm committed Aug 18, 2024
1 parent 30ae143 commit b1aa7c1
Show file tree
Hide file tree
Showing 37 changed files with 500 additions and 174 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-onpush.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 11
java-version: 17
cache: gradle
- name: docker login
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ out/

settings*.json
settings*.yml
import-config*.yml
retries.json
docker-compose.yml
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ dependencies {
implementation(kotlin("stdlib"))
implementation(kotlin("reflect"))
implementation("io.github.smaugfm:monobank:0.0.2")
implementation("io.github.smaugfm:lunchmoney:1.0.3-SNAPSHOT")
implementation("io.github.smaugfm:lunchmoney:1.1.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinxCoroutines")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutines")
implementation("com.github.livefront.sealed-enum:runtime:$sealedEnum")
Expand All @@ -54,7 +54,7 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson")
implementation("de.brudaswen.kotlinx.serialization:kotlinx-serialization-csv:2.0.0")
implementation("com.charleskorn.kaml:kaml:0.55.0")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.4.1")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.0")
implementation("io.github.oshai:kotlin-logging:5.1.0")
implementation("com.google.code.gson:gson:2.10.1")
testImplementation("io.mockk:mockk:1.13.8")
Expand Down
57 changes: 10 additions & 47 deletions src/main/kotlin/io/github/smaugfm/monobudget/Application.kt
Original file line number Diff line number Diff line change
@@ -1,71 +1,34 @@
package io.github.smaugfm.monobudget

import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smaugfm.monobudget.common.exception.BudgetBackendException
import io.github.smaugfm.monobudget.common.BaseApplication
import io.github.smaugfm.monobudget.common.notify.TelegramApi
import io.github.smaugfm.monobudget.common.notify.TelegramCallbackHandler
import io.github.smaugfm.monobudget.common.startup.ApplicationStartupVerifier
import io.github.smaugfm.monobudget.common.statement.StatementSource
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementEvents
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementItemProcessor
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementProcessingScopeComponent
import io.github.smaugfm.monobudget.common.telegram.TelegramApi
import io.github.smaugfm.monobudget.common.telegram.TelegramCallbackHandler
import io.github.smaugfm.monobudget.common.util.injectAll
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import org.koin.core.component.KoinComponent
import org.koin.core.component.inject
import kotlin.system.exitProcess

private val log = KotlinLogging.logger {}

class Application<TTransaction, TNewTransaction> :
KoinComponent {
BaseApplication<TTransaction, TNewTransaction>() {
override val statementSources: List<StatementSource> by injectAll<StatementSource>()
private val telegramApi by inject<TelegramApi>()
private val statementSources by injectAll<StatementSource>()
private val startupVerifiers by injectAll<ApplicationStartupVerifier>()
private val telegramCallbackHandler by inject<TelegramCallbackHandler<TTransaction>>()
private val statementEvents by inject<StatementEvents>()

suspend fun run() {
runStartupChecks()

statementSources.forEach { it.prepare() }

telegramApi.start(telegramCallbackHandler::handle)
log.info { "Started application" }

statementSources.asFlow()
.flatMapMerge { it.statements() }
.filter(statementEvents::onNewStatement)
.map(::StatementProcessingScopeComponent)
.onEach {
with(it) {
try {
scope.get<StatementItemProcessor<TTransaction, TNewTransaction>>()
.process()
statementEvents.onStatementEnd(ctx)
} catch (e: BudgetBackendException) {
statementEvents.onStatementRetry(ctx, e)
} catch (e: Throwable) {
statementEvents.onStatementError(ctx, e)
} finally {
scope.close()
}
}
}
.collect()
}

private suspend fun runStartupChecks() {
override suspend fun beforeStart() {
try {
startupVerifiers.forEach { it.verify() }
} catch (e: Throwable) {
log.error(e) { "Failed to start application. Exiting..." }
exitProcess(1)
}
}

override suspend fun afterSourcesPrepare() {
telegramApi.start(telegramCallbackHandler::handle)
}
}
9 changes: 8 additions & 1 deletion src/main/kotlin/io/github/smaugfm/monobudget/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.github.smaugfm.monobudget.common.model.settings.MonoAccountSettings
import io.github.smaugfm.monobudget.common.model.settings.Settings
import io.github.smaugfm.monobudget.common.retry.JacksonFileStatementRetryRepository
import io.github.smaugfm.monobudget.common.retry.StatementRetryRepository
import io.github.smaugfm.monobudget.import.ImporterApplication
import io.github.smaugfm.monobudget.lunchmoney.LunchmoneyModule
import io.github.smaugfm.monobudget.mono.MonoApi
import io.github.smaugfm.monobudget.mono.MonoModule
Expand All @@ -37,7 +38,13 @@ private val log = KotlinLogging.logger {}

private const val DEFAULT_HTTP_PORT = 80

fun main() {
fun main(args: Array<String>) {
if (args.size == 1 && args[0] == "importer") {
return runBlocking {
ImporterApplication.main(this)
}
}

val env = System.getenv()
val setWebhook = env["SET_WEBHOOK"]?.toBoolean() ?: false
val monoWebhookUrl = URI(env["MONO_WEBHOOK_URL"]!!)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.github.smaugfm.monobudget.common

import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smaugfm.monobudget.common.exception.BudgetBackendException
import io.github.smaugfm.monobudget.common.statement.StatementSource
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementEvents
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementItemProcessor
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementProcessingScopeComponent
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import org.koin.core.component.KoinComponent
import org.koin.core.component.inject

private val log = KotlinLogging.logger {}

abstract class BaseApplication<TTransaction, TNewTransaction> : KoinComponent {
protected abstract val statementSources: List<StatementSource>
private val statementEvents by inject<StatementEvents>()

open suspend fun run() {
beforeStart()

statementSources.forEach { it.prepare() }

afterSourcesPrepare()

log.info { "Started application" }

statementSources.asFlow()
.flatMapMerge { it.statements() }
.filter(statementEvents::onNewStatement)
.map(::StatementProcessingScopeComponent)
.onEach {
with(it) {
try {
scope.get<StatementItemProcessor<TTransaction, TNewTransaction>>()
.process()
statementEvents.onStatementEnd(ctx)
} catch (e: BudgetBackendException) {
statementEvents.onStatementRetry(ctx, e)
} catch (e: Throwable) {
statementEvents.onStatementError(ctx, e)
} finally {
scope.close()
}
}
}
.collect()
}

protected open suspend fun beforeStart() {
}

protected open suspend fun afterSourcesPrepare() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,31 @@ package io.github.smaugfm.monobudget.common.account
import io.github.smaugfm.monobudget.common.model.financial.StatementItem
import io.github.smaugfm.monobudget.common.util.misc.ConcurrentExpiringMap
import kotlinx.coroutines.Deferred
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration

abstract class TransferCache<TTransaction> :
ConcurrentExpiringMap<StatementItem, Deferred<TTransaction>>(1.minutes)
interface TransferCache<TTransaction> {
suspend fun getEntries(item: StatementItem): Set<Map.Entry<StatementItem, Deferred<TTransaction>>>

suspend fun put(
item: StatementItem,
transaction: Deferred<TTransaction>,
)

open class Expiring<TTransaction>(expirationDuration: Duration) :
TransferCache<TTransaction> {
private val cache = ConcurrentExpiringMap<StatementItem, Deferred<TTransaction>>(expirationDuration)

override suspend fun getEntries(
item: StatementItem,
): Set<Map.Entry<StatementItem, Deferred<TTransaction>>> {
return cache.entries
}

override suspend fun put(
item: StatementItem,
transaction: Deferred<TTransaction>,
) {
cache.add(item, transaction)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,32 @@ package io.github.smaugfm.monobudget.common.account
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smaugfm.monobudget.common.model.financial.StatementItem
import io.github.smaugfm.monobudget.common.statement.lifecycle.StatementProcessingContext
import io.github.smaugfm.monobudget.common.util.misc.ConcurrentExpiringMap
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred

private val log = KotlinLogging.logger {}

abstract class TransferDetector<TTransaction>(
private val bankAccounts: BankAccountService,
private val ctx: StatementProcessingContext,
private val cache: ConcurrentExpiringMap<StatementItem, Deferred<TTransaction>>,
private val cache: TransferCache<TTransaction>,
) {
suspend fun checkForTransfer(): MaybeTransfer<TTransaction> =
ctx.getOrPut("transfer") {
val existingTransfer =
cache.entries.firstOrNull { (recentStatementItem) ->
cache.getEntries(ctx.item).firstOrNull { (recentStatementItem) ->
checkIsTransferTransactions(recentStatementItem)
}?.value?.await()

if (existingTransfer != null) {
log.debug {
log.info {
"Found matching transfer transaction.\n" +
"Current: ${ctx.item}\n" +
"Recent transfer: $existingTransfer"
}
MaybeTransfer.Transfer(ctx.item, existingTransfer)
} else {
val deferred = CompletableDeferred<TTransaction>()
cache.add(ctx.item, deferred)
cache.put(ctx.item, deferred)

MaybeTransfer.NotTransfer(ctx.item, deferred)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.smaugfm.monobudget.common.model.serializer

import kotlinx.datetime.LocalDate
import kotlinx.datetime.toLocalDate
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.PrimitiveKind
import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
Expand All @@ -23,6 +22,6 @@ class LocalDateAsISOSerializer : KSerializer<LocalDate> {
}

override fun deserialize(decoder: Decoder): LocalDate {
return decoder.decodeString().toLocalDate()
return LocalDate.parse(decoder.decodeString())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.smaugfm.monobudget.common.notify

import io.github.smaugfm.monobudget.common.model.financial.BankAccountId
import io.github.smaugfm.monobudget.common.model.telegram.MessageWithReplyKeyboard

interface StatementItemNotificationSender {
suspend fun notify(
accountId: BankAccountId,
newMessage: MessageWithReplyKeyboard,
)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.smaugfm.monobudget.common.telegram
package io.github.smaugfm.monobudget.common.notify

import com.elbekd.bot.Bot
import com.elbekd.bot.model.ChatId
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.smaugfm.monobudget.common.telegram
package io.github.smaugfm.monobudget.common.notify

import com.elbekd.bot.model.ChatId
import com.elbekd.bot.model.TelegramApiError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.smaugfm.monobudget.common.telegram
package io.github.smaugfm.monobudget.common.notify

import com.elbekd.bot.model.ChatId
import com.elbekd.bot.types.CallbackQuery
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.smaugfm.monobudget.common.telegram
package io.github.smaugfm.monobudget.common.notify

import com.elbekd.bot.model.ChatId
import com.elbekd.bot.types.ParseMode
Expand All @@ -11,11 +11,11 @@ import org.koin.core.annotation.Single
private val log = KotlinLogging.logger {}

@Single
class TelegramMessageSender(
class TelegramNotificationSender(
private val bankAccounts: BankAccountService,
private val telegramApi: TelegramApi,
) {
suspend fun send(
) : StatementItemNotificationSender {
override suspend fun notify(
accountId: BankAccountId,
newMessage: MessageWithReplyKeyboard,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.github.smaugfm.monobudget.common.statement.lifecycle
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smaugfm.monobudget.common.account.BankAccountService
import io.github.smaugfm.monobudget.common.account.TransferDetector
import io.github.smaugfm.monobudget.common.telegram.TelegramMessageSender
import io.github.smaugfm.monobudget.common.notify.StatementItemNotificationSender
import io.github.smaugfm.monobudget.common.transaction.TransactionFactory
import io.github.smaugfm.monobudget.common.transaction.TransactionMessageFormatter
import io.github.smaugfm.monobudget.common.util.pp
Expand All @@ -16,7 +16,7 @@ abstract class StatementItemProcessor<TTransaction, TNewTransaction>(
private val bankAccounts: BankAccountService,
private val transferDetector: TransferDetector<TTransaction>,
private val messageFormatter: TransactionMessageFormatter<TTransaction>,
private val telegramMessageSender: TelegramMessageSender,
private val notificationSender: StatementItemNotificationSender,
) {
suspend fun process() {
logStatement()
Expand All @@ -30,7 +30,7 @@ abstract class StatementItemProcessor<TTransaction, TNewTransaction>(
val transaction = transactionFactory.create(maybeTransfer)
val message = messageFormatter.format(ctx.item, transaction)

telegramMessageSender.send(ctx.item.accountId, message)
notificationSender.notify(ctx.item.accountId, message)
}

private suspend fun logStatement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ abstract class NewTransactionFactory<TNewTransaction> : KoinComponent {

companion object {
@JvmStatic
protected fun StatementItem?.formatDescription() = (this?.description ?: "").replaceNewLines()
fun StatementItem?.formatDescription() = (this?.description ?: "").replaceNewLines()
}
}
Loading

0 comments on commit b1aa7c1

Please sign in to comment.