Skip to content

Commit

Permalink
fix: handle channel shutdown; fix other exit reasons
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Jan 29, 2025
1 parent a49619e commit 5275ddc
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 30 deletions.
12 changes: 8 additions & 4 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ defmodule Realtime.Tenants.Connect do
{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, :killed} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
{:error, :tenant_database_unavailable}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:error, :tenant_database_unavailable}
Expand Down Expand Up @@ -183,12 +187,12 @@ defmodule Realtime.Tenants.Connect do
else
error ->
log_error("MigrationsFailedToRun", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end
rescue
error ->
log_error("MigrationsFailedToRun", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end

def handle_continue(:start_listen_and_replication, state) do
Expand All @@ -201,12 +205,12 @@ defmodule Realtime.Tenants.Connect do
else
{:error, error} ->
log_error("StartListenAndReplicationFailed", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end
rescue
error ->
log_error("StartListenAndReplicationFailed", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end

@impl true
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ defmodule Realtime.Tenants.ReplicationConnection do

@impl true
def handle_disconnect(state) do
Logger.warning("Disconnecting broadcast changes handler: #{inspect(state, pretty: true)}")
Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")
{:noreply, %{state | step: :disconnected}}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ defmodule RealtimeWeb.RealtimeChannel do
metadata = log_metadata(access_token)
push_system_message("system", socket, "error", message, channel_name)
log_warning("ChannelShutdown", message, metadata)
{:stop, :shutdown, socket}
{:stop, :normal, socket}
end

defp push_system_message(extension, socket, status, error, channel_name)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.12",
version: "2.34.13",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/.template.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PROJECT_URL=
PROJECT_ANON_TOKEN=
PROJECT_JWT_SECRET
PROJECT_JWT_SECRET=
7 changes: 4 additions & 3 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule Realtime.Integration.RtChannelTest do

alias __MODULE__.Endpoint
alias Extensions.PostgresCdcRls, as: Rls
alias Phoenix.Socket.{V1, Message}
alias Phoenix.Socket.Message
alias Phoenix.Socket.V1
alias Postgrex, as: P
alias Realtime.Api.Tenant
alias Realtime.Database
Expand Down Expand Up @@ -119,7 +120,7 @@ defmodule Realtime.Integration.RtChannelTest do
ref: nil,
topic: ^topic
},
5000
8000

{:ok, _, conn} = Rls.get_manager_conn(@external_id)
P.query!(conn, "insert into test (details) values ('test')", [])
Expand Down Expand Up @@ -1332,7 +1333,7 @@ defmodule Realtime.Integration.RtChannelTest do
config = %{broadcast: %{self: true}, private: false}
realtime_topic = "realtime:#{random_string()}"

for _ <- 1..10 do
for _ <- 1..15 do
WebsocketClient.join(socket, realtime_topic, %{config: config})
end

Expand Down
25 changes: 14 additions & 11 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Realtime.Tenants.ConnectTest do
alias Realtime.UsersCounter

setup do
Cleanup.ensure_no_replication_slot()
tenant = tenant_fixture()
Cleanup.ensure_no_replication_slot()
%{tenant: tenant}
end

Expand All @@ -20,6 +20,7 @@ defmodule Realtime.Tenants.ConnectTest do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(100)
assert is_pid(db_conn)
Connect.shutdown(tenant.external_id)
end

test "on database disconnect, returns new connection", %{tenant: tenant} do
Expand All @@ -33,6 +34,7 @@ defmodule Realtime.Tenants.ConnectTest do
on_exit(fn -> Process.exit(new_conn, :shutdown) end)

assert new_conn != old_conn
Connect.shutdown(tenant.external_id)
end

test "if tenant exists but unable to connect, returns error" do
Expand Down Expand Up @@ -71,13 +73,14 @@ defmodule Realtime.Tenants.ConnectTest do
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 100)

# Not enough time has passed, connection still alive
Process.sleep(500)
Process.sleep(400)
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)

# Enough time has passed, connection stopped
Process.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
Connect.shutdown(tenant_id)
end

test "if users are connected to a tenant channel, keep the connection", %{
Expand All @@ -94,6 +97,8 @@ defmodule Realtime.Tenants.ConnectTest do
Process.sleep(300)
assert {^pid, %{conn: ^conn_pid}} = :syn.lookup(Connect, tenant_id)
assert Process.alive?(db_conn)

Connect.shutdown(tenant_id)
end

test "connection is killed after user leaving", %{
Expand All @@ -110,6 +115,7 @@ defmodule Realtime.Tenants.ConnectTest do
Process.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
Connect.shutdown(tenant_id)
end

test "error if tenant is suspended" do
Expand All @@ -133,6 +139,7 @@ defmodule Realtime.Tenants.ConnectTest do
Realtime.Tenants.unsuspend_tenant_by_external_id(tenant.external_id)
Process.sleep(50)
assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
Connect.shutdown(tenant.external_id)
end

test "properly handles of failing calls by avoid creating too many connections" do
Expand Down Expand Up @@ -178,13 +185,12 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "starts broadcast handler and does not fail on existing connection", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)

assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(3000)

replication_connection_before = ReplicationConnection.whereis(tenant.external_id)
listen_before = Listen.whereis(tenant.external_id)

assert Process.alive?(replication_connection_before)
assert Process.alive?(listen_before)

Expand All @@ -200,9 +206,8 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "failed broadcast handler and listen recover from failure", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(3000)
Process.sleep(1000)

replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
listen_pid = ReplicationConnection.whereis(tenant.external_id)
Expand All @@ -224,10 +229,9 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "on database disconnect, connection is killed to all components", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
old_pid = Connect.whereis(tenant.external_id)
Process.sleep(3000)
Process.sleep(1000)

old_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
old_listen_connection_pid = Listen.whereis(tenant.external_id)
Expand All @@ -252,15 +256,14 @@ defmodule Realtime.Tenants.ConnectTest do
describe "shutdown/1" do
test "shutdowns all associated connections", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(1000)

assert Process.alive?(db_conn)
Process.sleep(300)
assert Process.alive?(Connect.whereis(tenant.external_id))
assert Process.alive?(ReplicationConnection.whereis(tenant.external_id))
assert Process.alive?(Listen.whereis(tenant.external_id))

Connect.shutdown(tenant.external_id)
Process.sleep(1000)
Process.sleep(200)
refute Connect.whereis(tenant.external_id)
refute ReplicationConnection.whereis(tenant.external_id)
refute Listen.whereis(tenant.external_id)
Expand Down
28 changes: 21 additions & 7 deletions test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

socket = Socket.assign(socket, %{limits: %{@default_limits | max_concurrent_users: 1}})
assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
Expand All @@ -43,7 +43,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

socket_at_capacity =
Socket.assign(socket, %{limits: %{@default_limits | max_concurrent_users: 0}})
Expand All @@ -66,7 +66,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
end
Expand All @@ -77,23 +77,27 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time(), "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert capture_log(fn ->
assert {:error, %{reason: "Token expiration time is invalid"}} =
subscribe_and_join(socket, "realtime:test", %{})

Process.sleep(300)
end) =~ "InvalidJWTExpiration: Token expiration time is invalid"
end

with_mock ChannelsAuthorization, [],
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() - 1, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert capture_log(fn ->
assert {:error, %{reason: "Token expiration time is invalid"}} =
subscribe_and_join(socket, "realtime:test", %{})

Process.sleep(300)
end) =~ "InvalidJWTExpiration: Token expiration time is invalid"
end
end
Expand All @@ -105,6 +109,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :missing_claims} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT"
Expand All @@ -121,6 +127,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :missing_claims} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts)

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT"
Expand All @@ -140,6 +148,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :expired_token} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts)

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago"
Expand All @@ -156,6 +166,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :expired_token} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago"
Expand All @@ -174,7 +186,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
end

test "successful connection proceeds with join" do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())
assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
end

Expand All @@ -198,7 +210,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do
]

tenant = tenant_fixture(%{extensions: extensions})
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant.external_id))

{:ok, %Socket{} = socket} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id))

assert {:error, %{reason: "Realtime was unable to connect to the project database"}} =
subscribe_and_join(socket, "realtime:test", %{})
Expand Down
3 changes: 2 additions & 1 deletion test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.TenantControllerTest do

import Mock

alias Ecto.Adapters.SQL.Sandbox
alias Realtime.Api.Tenant
alias Realtime.Crypto
alias Realtime.Database
Expand All @@ -12,7 +13,6 @@ defmodule RealtimeWeb.TenantControllerTest do
alias Realtime.Tenants.Cache
alias Realtime.Tenants.Connect
alias Realtime.UsersCounter

alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.JwtVerification

Expand Down Expand Up @@ -53,6 +53,7 @@ defmodule RealtimeWeb.TenantControllerTest do
setup %{conn: conn} do
# Ensure no replication slot is present before the test
Cleanup.ensure_no_replication_slot()
Sandbox.checkout(Realtime.Repo)

Application.put_env(:realtime, :db_enc_key, "1234567890123456")

Expand Down

0 comments on commit 5275ddc

Please sign in to comment.