Skip to content

Commit

Permalink
Added parameters for Clickhouse flush interval and max buffer size (p…
Browse files Browse the repository at this point in the history
…lausible#1073)

Currently, both are hard-coded to relatively small values.
This may cause an unnecessary high load on the Clickhouse instance.
It makes sense to make those configurable.
  • Loading branch information
ameshkov authored May 25, 2021
1 parent 6228995 commit 90cfed0
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
- CSV export now includes pageviews, bounce rate and visit duration in addition to visitors plausible/analytics#952
- Send stats to multiple dashboards by configuring a comma-separated list of domains plausible/analytics#968
- Time on Page metric available in detailed Top Pages report plausible/analytics#1007
- Added `CLICKHOUSE_FLUSH_INTERVAL_MS` and `CLICKHOUSE_MAX_BUFFER_SIZE` configuration parameters

### Fixed
- Fix weekly report time range plausible/analytics#951
Expand Down
7 changes: 6 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ app_version = System.get_env("APP_VERSION", "0.0.1")
ch_db_url =
System.get_env("CLICKHOUSE_DATABASE_URL", "http://plausible_events_db:8123/plausible_events_db")

{ch_flush_interval_ms, ""} = Integer.parse(System.get_env("CLICKHOUSE_FLUSH_INTERVAL_MS", "5000"))
{ch_max_buffer_size, ""} = Integer.parse(System.get_env("CLICKHOUSE_MAX_BUFFER_SIZE", "10000"))

### Mandatory params End

sentry_dsn = System.get_env("SENTRY_DSN")
Expand Down Expand Up @@ -129,7 +132,9 @@ config :plausible, Plausible.ClickhouseRepo,
loggers: [Ecto.LogEntry],
queue_target: 500,
queue_interval: 2000,
url: ch_db_url
url: ch_db_url,
flush_interval_ms: ch_flush_interval_ms,
max_buffer_size: ch_max_buffer_size

case mailer_adapter do
"Bamboo.PostmarkAdapter" ->
Expand Down
20 changes: 13 additions & 7 deletions lib/plausible/event/write_buffer.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
@flush_interval_ms 5_000
@max_buffer_size 10_000

def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:ok, %{buffer: buffer, timer: timer}}
end

Expand All @@ -27,11 +25,11 @@ defmodule Plausible.Event.WriteBuffer do
def handle_cast({:insert, event}, %{buffer: buffer} = state) do
new_buffer = [event | buffer]

if length(new_buffer) >= @max_buffer_size do
if length(new_buffer) >= max_buffer_size() do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(state[:timer])
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
Expand All @@ -40,14 +38,14 @@ defmodule Plausible.Event.WriteBuffer do

def handle_info(:tick, %{buffer: buffer}) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: timer}}
end

def handle_call(:flush, _from, %{buffer: buffer} = state) do
Process.cancel_timer(state[:timer])
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:reply, nil, %{buffer: [], timer: new_timer}}
end

Expand All @@ -67,4 +65,12 @@ defmodule Plausible.Event.WriteBuffer do
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end

defp flush_interval_ms() do
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :flush_interval_ms)
end

defp max_buffer_size() do
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :max_buffer_size)
end
end
18 changes: 12 additions & 6 deletions lib/plausible/session/write_buffer.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
defmodule Plausible.Session.WriteBuffer do
use GenServer
require Logger
@flush_interval_ms 1000
@max_buffer_size 10_000

def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:ok, %{buffer: buffer, timer: timer}}
end

Expand All @@ -22,11 +20,11 @@ defmodule Plausible.Session.WriteBuffer do
def handle_cast({:insert, sessions}, %{buffer: buffer} = state) do
new_buffer = sessions ++ buffer

if length(new_buffer) >= @max_buffer_size do
if length(new_buffer) >= max_buffer_size() do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(state[:timer])
flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
Expand All @@ -35,7 +33,7 @@ defmodule Plausible.Session.WriteBuffer do

def handle_info(:tick, %{buffer: buffer}) do
flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: timer}}
end

Expand All @@ -60,4 +58,12 @@ defmodule Plausible.Session.WriteBuffer do
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseSession, sessions)
end
end

defp flush_interval_ms() do
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :flush_interval_ms)
end

defp max_buffer_size() do
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :max_buffer_size)
end
end

0 comments on commit 90cfed0

Please sign in to comment.