Skip to content

Commit

Permalink
Add support for caching slow queries on large databases.
Browse files Browse the repository at this point in the history
- Add materialized views for list -> subscriber counts, dashboard chart,
  and dashboard aggregate stats that slow down significantly on large
  databases (with millions or tens of millions of subscribers). These
  slow queries involve full table scan COUNTS().

- Add a toggle to enable caching slow results in Settings -> Performance.

- Add support for setting a cron string that crons and periodically
  refreshes aggregated stats in materialized views.

Closes #1019.
  • Loading branch information
knadh committed Jan 27, 2024
1 parent 2f487de commit 5a3664a
Show file tree
Hide file tree
Showing 60 changed files with 587 additions and 136 deletions.
23 changes: 22 additions & 1 deletion cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/Masterminds/sprig/v3"
"github.com/gdgvda/cron"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"
"github.com/knadh/goyesql/v2"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/knadh/listmonk/internal/bounce"
"github.com/knadh/listmonk/internal/bounce/mailbox"
"github.com/knadh/listmonk/internal/captcha"
"github.com/knadh/listmonk/internal/core"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/media"
Expand Down Expand Up @@ -494,14 +496,17 @@ func initTxTemplates(m *manager.Manager, app *App) {
}

// initImporter initializes the bulk subscriber importer.
func initImporter(q *models.Queries, db *sqlx.DB, app *App) *subimporter.Importer {
func initImporter(q *models.Queries, db *sqlx.DB, core *core.Core, app *App) *subimporter.Importer {
return subimporter.New(
subimporter.Options{
DomainBlocklist: app.constants.Privacy.DomainBlocklist,
UpsertStmt: q.UpsertSubscriber.Stmt,
BlocklistStmt: q.UpsertBlocklistSubscriber.Stmt,
UpdateListDateStmt: q.UpdateListsDate.Stmt,
NotifCB: func(subject string, data interface{}) error {
// Refresh cached subscriber counts and stats.
core.RefreshMatViews(true)

app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data)
return nil
},
Expand Down Expand Up @@ -803,6 +808,22 @@ func initCaptcha() *captcha.Captcha {
})
}

func initCron(core *core.Core) {
c := cron.New()
_, err := c.Add(ko.MustString("app.cache_slow_queries_interval"), func() {
lo.Println("refreshing slow query cache")
_ = core.RefreshMatViews(true)
lo.Println("done refreshing slow query cache")
})
if err != nil {
lo.Printf("error initializing slow cache query cron: %v", err)
return
}

c.Start()
lo.Printf("IMPORTANT: database slow query caching is enabled. Aggregate numbers and stats will not be realtime. Next refresh at: %v", c.Entries()[0].Next)
}

func awaitReload(sigChan chan os.Signal, closerWait chan bool, closer func()) chan bool {
// The blocking signal handler that main() waits on.
out := make(chan bool)
Expand Down
8 changes: 7 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func main() {
cOpt := &core.Opt{
Constants: core.Constants{
SendOptinConfirmation: app.constants.SendOptinConfirmation,
CacheSlowQueries: ko.Bool("app.cache_slow_queries"),
},
Queries: queries,
DB: db,
Expand All @@ -208,7 +209,7 @@ func main() {

app.queries = queries
app.manager = initCampaignManager(app.queries, app.constants, app)
app.importer = initImporter(app.queries, db, app)
app.importer = initImporter(app.queries, db, app.core, app)
app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.i18n, app.constants)
initTxTemplates(app.manager, app)

Expand All @@ -233,6 +234,11 @@ func main() {
// Load system information.
app.about = initAbout(queries, db)

// Start cronjobs.
if cOpt.Constants.CacheSlowQueries {
initCron(app.core)
}

// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run()
Expand Down
8 changes: 8 additions & 0 deletions cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"
"unicode/utf8"

"github.com/gdgvda/cron"
"github.com/gofrs/uuid"
"github.com/jmoiron/sqlx/types"
"github.com/knadh/koanf/parsers/json"
Expand Down Expand Up @@ -207,6 +208,13 @@ func handleUpdateSettings(c echo.Context) error {
}
set.DomainBlocklist = doms

// Validate slow query caching cron.
if set.CacheSlowQueries {
if _, err := cron.ParseStandard(set.CacheSlowQueriesInterval); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.Ts("globals.messages.invalidData")+": slow query cron: "+err.Error())
}
}

// Update the settings in the DB.
if err := app.core.UpdateSettings(set); err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"
"strings"

"github.com/jmoiron/sqlx"
Expand All @@ -18,7 +19,7 @@ import (
// of logic to be performed before executing upgrades. fn is idempotent.
type migFunc struct {
version string
fn func(*sqlx.DB, stuffbin.FileSystem, *koanf.Koanf) error
fn func(*sqlx.DB, stuffbin.FileSystem, *koanf.Koanf, *log.Logger) error
}

// migList is the list of available migList ordered by the semver.
Expand Down Expand Up @@ -69,7 +70,7 @@ func upgrade(db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
// Execute migrations in succession.
for _, m := range toRun {
lo.Printf("running migration %s", m.version)
if err := m.fn(db, fs, ko); err != nil {
if err := m.fn(db, fs, ko, lo); err != nil {
lo.Fatalf("error running migration %s: %v", m.version, err)
}

Expand Down
8 changes: 5 additions & 3 deletions frontend/cypress/e2e/subscribers.cy.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ describe('Subscribers', () => {
// Get the ID from the header and proceed to fill the form.
let id = 0;
cy.get('[data-cy=id]').then(($el) => {
id = $el.text();
id = parseInt($el.text());

cy.get('input[name=email]').clear().type(email);
cy.get('input[name=name]').clear().type(name);
Expand All @@ -162,9 +162,11 @@ describe('Subscribers', () => {
});

// Confirm the edits on the table.
cy.wait(250);
cy.wait(500);
cy.log(rows);
cy.get('tbody tr').each(($el) => {
cy.wrap($el).find('td[data-id]').invoke('attr', 'data-id').then((id) => {
cy.wrap($el).find('td[data-id]').invoke('attr', 'data-id').then((idStr) => {
const id = parseInt(idStr);
cy.wrap($el).find('td[data-label=E-mail]').contains(rows[id].email.toLowerCase());
cy.wrap($el).find('td[data-label=Name]').contains(rows[id].name);
cy.wrap($el).find('td[data-label=Status]').contains(rows[id].status, { matchCase: false });
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/views/Campaign.vue
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
</div>
<div class="column has-text-right">
<a href="https://listmonk.app/docs/templating/#template-expressions" target="_blank" rel="noopener noreferer">
<b-icon icon="code" /> Templating reference</a>
<b-icon icon="code" /> {{ $t('campaigns.templatingRef') }}</a>
<span v-if="canEdit && form.content.contentType !== 'plain'" class="is-size-6 has-text-grey ml-6">
<a v-if="form.altbody === null" href="#" @click.prevent="onAddAltBody">
<b-icon icon="text" size="is-small" /> {{ $t('campaigns.addAltText') }}
Expand All @@ -193,7 +193,6 @@
{{ $t('campaigns.removeAltText') }}
</a>
</span>
</a>
</div>
</div>

Expand Down
9 changes: 9 additions & 0 deletions frontend/src/views/Dashboard.vue
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,21 @@
</div>
</div>
</div><!-- tile block -->
<p v-if="settings['app.cache_slow_queries']" class="has-text-grey">
*{{ $t('globals.messages.slowQueriesCached') }}
<a href="https://listmonk.app/docs/performance/query-caching" target="_blank" rel="noopener noreferer"
class="has-text-grey">
<b-icon icon="link-variant" /> {{ $t('globals.buttons.learnMore') }}
</a>
</p>
</section>
</section>
</template>

<script>
import dayjs from 'dayjs';
import Vue from 'vue';
import { mapState } from 'vuex';
import { colors } from '../constants';
import Chart from '../components/Chart.vue';
Expand Down Expand Up @@ -188,6 +196,7 @@ export default Vue.extend({
},
computed: {
...mapState(['settings']),
dayjs() {
return dayjs;
},
Expand Down
8 changes: 8 additions & 0 deletions frontend/src/views/Lists.vue
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@
<b-modal scroll="keep" :aria-modal="true" :active.sync="isFormVisible" :width="600" @close="onFormClose">
<list-form :data="curItem" :is-editing="isEditing" @finished="formFinished" />
</b-modal>

<p v-if="settings['app.cache_slow_queries']" class="has-text-grey">
*{{ $t('globals.messages.slowQueriesCached') }}
<a href="https://listmonk.app/docs/performance/query-caching" target="_blank" rel="noopener noreferer"
class="has-text-grey">
<b-icon icon="link-variant" /> {{ $t('globals.buttons.learnMore') }}
</a>
</p>
</section>
</template>

Expand Down
24 changes: 24 additions & 0 deletions frontend/src/views/settings/performance.vue
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@
</div>
</div>
</div><!-- sliding window -->

<div>
<hr />
<div class="columns">
<div class="column is-4">
<b-field :label="$t('settings.performance.cacheSlowQueries')"
:message="$t('settings.performance.cacheSlowQueriesHelp')">
<b-switch v-model="data['app.cache_slow_queries']" name="app.cache_slow_queries" />
</b-field>
</div>
<div class="column is-4" :class="{ disabled: !data['app.cache_slow_queries'] }">
<b-field :label="$t('settings.maintenance.cron')">
<b-input v-model="data['app.cache_slow_queries_interval']" :disabled="!data['app.cache_slow_queries']"
placeholder="0 3 * * *" />
</b-field>
</div>
<div class="column">
<br /><br />
<a href="https://listmonk.app/docs/performance/query-caching" target="_blank" rel="noopener noreferer">
<b-icon icon="link-variant" /> {{ $t('globals.buttons.learnMore') }}
</a>
</div>
</div>
</div>
</div>
</template>

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.0 // indirect
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gdgvda/cron v0.2.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gdgvda/cron v0.2.0 h1:oX8qdLZq4tC5StnCsZsTNs2BIzaRjcjmPZ4o+BArKX4=
github.com/gdgvda/cron v0.2.0/go.mod h1:VEwidZXB255kESB5DcUGRWTYZS8KkOBYD1YBn8Wiyx8=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down
4 changes: 4 additions & 0 deletions i18n/ca.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
"globals.messages.notFound": "No s'ha trobat {name} ",
"globals.messages.passwordChange": "Introduïu un valor per canviar",
"globals.messages.passwordChangeFull": "Buida i torna a introduir la contrasenya completa a '{name}'.",
"globals.messages.slowQueriesCached": "Slow queries are being cached. Some numbers on this page will not be up-to-date.",
"globals.messages.updated": "\"{name}\" actualitzat",
"globals.months.1": "gen.",
"globals.months.10": "oct.",
Expand Down Expand Up @@ -433,6 +434,7 @@
"settings.mailserver.username": "Usuari",
"settings.mailserver.waitTimeout": "Espera el timeout",
"settings.mailserver.waitTimeoutHelp": "Temps per esperar una nova activitat en una connexió abans de tancar-la i eliminar-la del grup (s per segon, m per minut).",
"settings.maintenance.cron": "Cron interval",
"settings.media.provider": "Proveïdor",
"settings.media.s3.bucket": "Contenidor",
"settings.media.s3.bucketPath": "Ruta del contenidor",
Expand Down Expand Up @@ -473,6 +475,8 @@
"settings.needsRestart": "La configuració ha canviat. Posa en pausa totes les campanyes en curs i reinicia l'aplicació",
"settings.performance.batchSize": "Mida del lot",
"settings.performance.batchSizeHelp": "El nombre de subscriptors que cal extreure de la base de dades en una sola iteració. Cada iteració extreu subscriptors de la base de dades, els envia missatges i després passa a la següent iteració per extreure el següent lot. Idealment, hauria de ser superior al rendiment màxim possible (concurrency * message_rate).",
"settings.performance.cacheSlowQueries": "Cache slow database queries",
"settings.performance.cacheSlowQueriesHelp": "Enable this on large databases that have slowed down. Caches list subscriber counts, dashboard statistics etc.",
"settings.performance.concurrency": "Concurrència",
"settings.performance.concurrencyHelp": "Màxim treballador concurrent (fils) que intentarà enviar missatges simultàniament.",
"settings.performance.maxErrThreshold": "Llindar d'error màxim",
Expand Down
4 changes: 4 additions & 0 deletions i18n/cs-cz.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
"globals.messages.notFound": "{name} nebyl nalezen",
"globals.messages.passwordChange": "Zadejte hodnotu ke změně",
"globals.messages.passwordChangeFull": "Vymazat a zadat úplné heslo znovu v '{name}'.",
"globals.messages.slowQueriesCached": "Slow queries are being cached. Some numbers on this page will not be up-to-date.",
"globals.messages.updated": "\"{name}\" aktualizován",
"globals.months.1": "Led",
"globals.months.10": "Říj",
Expand Down Expand Up @@ -433,6 +434,7 @@
"settings.mailserver.username": "Jméno uživatele",
"settings.mailserver.waitTimeout": "Časový limit čekání",
"settings.mailserver.waitTimeoutHelp": "Doba čekání na novou aktivitu na připojení před uzavřením a odebráním z fondu (s - sekundy, m - minuty).",
"settings.maintenance.cron": "Cron interval",
"settings.media.provider": "Poskytovatel",
"settings.media.s3.bucket": "Sektor",
"settings.media.s3.bucketPath": "Cesta sektoru",
Expand Down Expand Up @@ -473,6 +475,8 @@
"settings.needsRestart": "Nastavení změněno. Pozastavte všechny spuštěné kampaně a restartujte aplikaci",
"settings.performance.batchSize": "Velikost dávky",
"settings.performance.batchSizeHelp": "Počet odběratelů ke stažení z databáze v jednotlivé iteraci. Každá iterace stáhne odběratele z databáze, odešle jim zprávy a pak se přesune na další iteraci, aby stáhla další dávku. Ideálně by měl být vyšší než je maximální dosažitelná propustnost (souběžnost * četnost_zpráv).",
"settings.performance.cacheSlowQueries": "Cache slow database queries",
"settings.performance.cacheSlowQueriesHelp": "Enable this on large databases that have slowed down. Caches list subscriber counts, dashboard statistics etc.",
"settings.performance.concurrency": "Souběžnost",
"settings.performance.concurrencyHelp": "Maximální počet souběžných modulů worker (podprocesů), které se pokusí současně odeslat zprávy.",
"settings.performance.maxErrThreshold": "Maximální prahová hodnota chyb",
Expand Down
4 changes: 4 additions & 0 deletions i18n/cy.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
"globals.messages.notFound": "Heb ddod o hyd i {enw]",
"globals.messages.passwordChange": "Rhoi gwerth i'w newid",
"globals.messages.passwordChangeFull": "Clirio ac ailgyflwyno'r cyfrinair llawn yn '{name}'.",
"globals.messages.slowQueriesCached": "Slow queries are being cached. Some numbers on this page will not be up-to-date.",
"globals.messages.updated": "Wedi diweddaru “{name}”",
"globals.months.1": "Ion",
"globals.months.10": "Hyd",
Expand Down Expand Up @@ -433,6 +434,7 @@
"settings.mailserver.username": "Enw defnyddiwr",
"settings.mailserver.waitTimeout": "Terfyn amser aros",
"settings.mailserver.waitTimeoutHelp": "Amser aros ar gyfer gweithgaredd newydd ar gysylltiad cyn ei gau a'i ddileu o'r gronfa (e ar gyfer eiliad",
"settings.maintenance.cron": "Cron interval",
"settings.media.provider": "Darparwr",
"settings.media.s3.bucket": "Bwced",
"settings.media.s3.bucketPath": "Llwybr bwced",
Expand Down Expand Up @@ -473,6 +475,8 @@
"settings.needsRestart": "Wedi newid y gosodiadau. Rhewi'r holl ymgyrchoedd byw ac ailgychwyn yr ap",
"settings.performance.batchSize": "Maint y swp",
"settings.performance.batchSizeHelp": "Nifer y tanysgrifwyr y mae modd eu tynnu o'r gronfa ddata ar yr un pryd. Bydd pob iteriad yn tynnu tanysgrifwyr o'r gronfa ddata",
"settings.performance.cacheSlowQueries": "Cache slow database queries",
"settings.performance.cacheSlowQueriesHelp": "Enable this on large databases that have slowed down. Caches list subscriber counts, dashboard statistics etc.",
"settings.performance.concurrency": "Cydamseru",
"settings.performance.concurrencyHelp": "Uchafswm nifer y gweithwyr (llinynnau) a fydd yn ceisio anfon negeseuon yr un pryd.",
"settings.performance.maxErrThreshold": "Uchafswm nifer y gwallau",
Expand Down
4 changes: 4 additions & 0 deletions i18n/da.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
"globals.messages.notFound": "{name} ikke fundet",
"globals.messages.passwordChange": "Indtast en værdi, der skal ændres",
"globals.messages.passwordChangeFull": "Ryd og indtast den fulde adgangskode igen i '{name}'.",
"globals.messages.slowQueriesCached": "Slow queries are being cached. Some numbers on this page will not be up-to-date.",
"globals.messages.updated": "\"{name}\" opdateret",
"globals.months.1": "Jan",
"globals.months.10": "Okt",
Expand Down Expand Up @@ -429,6 +430,7 @@
"settings.mailserver.username": "Brugernavn",
"settings.mailserver.waitTimeout": "Ventetid timeout",
"settings.mailserver.waitTimeoutHelp": "Tid til at vente på ny aktivitet på en forbindelse, før du lukker den og fjerner den fra poolen (s for sekund, m for minut).",
"settings.maintenance.cron": "Cron interval",
"settings.media.provider": "Udbyder",
"settings.media.s3.bucket": "Spand",
"settings.media.s3.bucketPath": "Spand sti",
Expand Down Expand Up @@ -469,6 +471,8 @@
"settings.needsRestart": "Indstillinger ændret. Sæt alle kørende kampagner på pause, og genstart appen",
"settings.performance.batchSize": "Batch størrelse",
"settings.performance.batchSizeHelp": "Antallet af abonnenter, der skal trækkes fra databasen i en enkelt iteration. Hver iteration trækker abonnenter fra databasen, sender meddelelser til dem og går derefter videre til den næste iteration for at trække den næste batch. Dette bør ideelt set være højere end den maksimalt opnåelige gennemstrømning (samtidighed * message_rate).",
"settings.performance.cacheSlowQueries": "Cache slow database queries",
"settings.performance.cacheSlowQueriesHelp": "Enable this on large databases that have slowed down. Caches list subscriber counts, dashboard statistics etc.",
"settings.performance.concurrency": "Samtidighed",
"settings.performance.concurrencyHelp": "Maksimalt antal samtidige arbejdere (tråde), der forsøger at sende meddelelser samtidigt.",
"settings.performance.maxErrThreshold": "Maksimal fejltærskel",
Expand Down
Loading

0 comments on commit 5a3664a

Please sign in to comment.