diff --git a/README.md b/README.md index 9861c0f35..35a5bd19c 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,8 @@ This is the list of operational codes that can help you understand your deployme | UnableToFindCounter | Error when trying to find a counter to track rate limits for a tenant | | UnhandledProcessMessage | Unhandled message received by a Realtime process | | UnableToSetPolicies | We were not able to set policies for this connection | +| IncreaseConnectionPool | The number of connections you have set for Realtime are not enough to handle your current use case | +| RlsPolicyError | Error on RLS policy used for authorization | | ConnectionInitializing | Database is initializing connection | | DatabaseConnectionIssue | Database had connection issues and connection was not able to be established | | UnableToConnectToProject | Unable to connect to Project database | diff --git a/config/config.exs b/config/config.exs index cc21b08ad..da725e7b1 100644 --- a/config/config.exs +++ b/config/config.exs @@ -51,7 +51,7 @@ config :tailwind, # Configures Elixir's Logger config :logger, :console, format: "$time $metadata[$level] $message\n", - metadata: [:request_id, :project, :external_id, :application_name] + metadata: [:request_id, :project, :external_id, :application_name, :sub, :error_code] # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason diff --git a/config/test.exs b/config/test.exs index 7912527ae..f57c79836 100644 --- a/config/test.exs +++ b/config/test.exs @@ -43,7 +43,7 @@ config :joken, current_time_adapter: RealtimeWeb.Joken.CurrentTime.Mock # Print only errors during test -config :logger, level: :error +config :logger, level: :warning # Configures Elixir's Logger config :logger, :console, diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index f6dac0669..fcc419eee 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -63,7 +63,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do msg = "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]" - log_error("RealtimeDisabledForConfiguration", msg) + log_warning("RealtimeDisabledForConfiguration", msg) rollback(conn, msg) {:error, exception} -> diff --git a/lib/realtime/logs.ex b/lib/realtime/logs.ex index caf0cceac..8d6574ca8 100644 --- a/lib/realtime/logs.ex +++ b/lib/realtime/logs.ex @@ -17,6 +17,14 @@ defmodule Realtime.Logs do def log_error(code, error, metadata \\ []) do Logger.error("#{code}: #{to_log(error)}", [error_code: code] ++ metadata) end + + @doc """ + Logs warning with a given Operational Code + """ + @spec log_error(String.t(), any(), keyword()) :: :ok + def log_warning(code, warning, metadata \\ []) do + Logger.warning("#{code}: #{to_log(warning)}", [{:error_code, code} | metadata]) + end end defimpl Jason.Encoder, for: DBConnection.ConnectionError do diff --git a/lib/realtime/tenants/authorization.ex b/lib/realtime/tenants/authorization.ex index e8935ce14..ba8fd0946 100644 --- a/lib/realtime/tenants/authorization.ex +++ b/lib/realtime/tenants/authorization.ex @@ -18,7 +18,7 @@ defmodule Realtime.Tenants.Authorization do alias Realtime.Database alias Realtime.Repo alias Realtime.Tenants.Authorization.Policies - + alias DBConnection.ConnectionError defstruct [:topic, :headers, :jwt, :claims, :role] @type t :: %__MODULE__{ @@ -54,14 +54,16 @@ defmodule Realtime.Tenants.Authorization do Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Plug.Conn). """ @spec get_read_authorizations(Socket.t() | Conn.t(), pid(), __MODULE__.t()) :: - {:ok, Socket.t() | Conn.t()} | {:error, any()} + {:ok, Socket.t() | Conn.t()} | {:error, any()} | {:error, :rls_policy_error, any()} def get_read_authorizations(%Socket{} = socket, db_conn, authorization_context) do policies = Map.get(socket.assigns, :policies) || %Policies{} - with {:ok, %Policies{} = policies} <- - get_read_policies_for_connection(db_conn, authorization_context, policies) do - {:ok, Socket.assign(socket, :policies, policies)} + case get_read_policies_for_connection(db_conn, authorization_context, policies) do + {:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)} + {:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error} + {:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool} + {:error, error} -> {:error, error} end end @@ -70,6 +72,8 @@ defmodule Realtime.Tenants.Authorization do case get_read_policies_for_connection(db_conn, authorization_context, policies) do {:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)} + {:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error} + {:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool} {:error, error} -> {:error, error} end end @@ -78,7 +82,9 @@ defmodule Realtime.Tenants.Authorization do Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Conn). """ @spec get_write_authorizations(Socket.t() | Conn.t() | pid(), pid(), __MODULE__.t()) :: - {:ok, Socket.t() | Conn.t() | Policies.t()} | {:error, any()} + {:ok, Socket.t() | Conn.t() | Policies.t()} + | {:error, any()} + | {:error, :rls_policy_error, any()} def get_write_authorizations( %Socket{} = socket, @@ -89,6 +95,8 @@ defmodule Realtime.Tenants.Authorization do case get_write_policies_for_connection(db_conn, authorization_context, policies) do {:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)} + {:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error} + {:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool} {:error, error} -> {:error, error} end end @@ -98,6 +106,8 @@ defmodule Realtime.Tenants.Authorization do case get_write_policies_for_connection(db_conn, authorization_context, policies) do {:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)} + {:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error} + {:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool} {:error, error} -> {:error, error} end end @@ -105,6 +115,8 @@ defmodule Realtime.Tenants.Authorization do def get_write_authorizations(db_conn, db_conn, authorization_context) when is_pid(db_conn) do case get_write_policies_for_connection(db_conn, authorization_context, %Policies{}) do {:ok, %Policies{} = policies} -> {:ok, policies} + {:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error} + {:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool} {:error, error} -> {:error, error} end end @@ -217,13 +229,14 @@ defmodule Realtime.Tenants.Authorization do or_where: [extension: :presence, id: ^presence_id] ) - {:ok, res} = Repo.all(conn, query, Message) - can_presence? = Enum.any?(res, fn %{id: id} -> id == presence_id end) - can_broadcast? = Enum.any?(res, fn %{id: id} -> id == broadcast_id end) + with {:ok, res} <- Repo.all(conn, query, Message) do + can_presence? = Enum.any?(res, fn %{id: id} -> id == presence_id end) + can_broadcast? = Enum.any?(res, fn %{id: id} -> id == broadcast_id end) - policies - |> Policies.update_policies(:presence, :read, can_presence?) - |> Policies.update_policies(:broadcast, :read, can_broadcast?) + policies + |> Policies.update_policies(:presence, :read, can_presence?) + |> Policies.update_policies(:broadcast, :read, can_broadcast?) + end end defp get_write_policy_for_connection_and_extension( @@ -237,11 +250,27 @@ defmodule Realtime.Tenants.Authorization do presence_changeset = Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :presence}) - broadcast_result = Repo.insert(conn, broadcast_changeset, Message, mode: :savepoint) - presence_result = Repo.insert(conn, presence_changeset, Message, mode: :savepoint) + policies = + case Repo.insert(conn, broadcast_changeset, Message, mode: :savepoint) do + {:ok, _} -> + Policies.update_policies(policies, :broadcast, :write, true) - policies - |> Policies.update_policies(:presence, :write, match?({:ok, _}, presence_result)) - |> Policies.update_policies(:broadcast, :write, match?({:ok, _}, broadcast_result)) + {:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} -> + Policies.update_policies(policies, :broadcast, :write, false) + + e -> + e + end + + case Repo.insert(conn, presence_changeset, Message, mode: :savepoint) do + {:ok, _} -> + Policies.update_policies(policies, :presence, :write, true) + + {:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} -> + Policies.update_policies(policies, :presence, :write, false) + + e -> + e + end end end diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 9f979a0c0..b0e506271 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -2,7 +2,7 @@ defmodule Realtime.Tenants.Migrations do @moduledoc """ Run Realtime database migrations for tenant's database. """ - use GenServer + use GenServer, restart: :transient require Logger diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index b28fcc756..5aab7f775 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -104,18 +104,18 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, state, assign(socket, assigns)} else {:error, :expired_token, msg} -> - Logging.log_error_message(:error, "InvalidJWTToken", msg) + Logging.log_error_message(:warning, "InvalidJWTToken", msg) {:error, :missing_claims} -> msg = "Fields `role` and `exp` are required in JWT" - Logging.log_error_message(:error, "InvalidJWTToken", msg) + Logging.log_error_message(:warning, "InvalidJWTToken", msg) {:error, :expected_claims_map} -> msg = "Token claims must be a map" - Logging.log_error_message(:error, "InvalidJWTToken", msg) + Logging.log_error_message(:warning, "InvalidJWTToken", msg) {:error, :unauthorized, msg} -> - Logging.log_error_message(:error, "Unauthorized", msg) + Logging.log_error_message(:warning, "Unauthorized", msg) {:error, :too_many_channels} -> msg = "Too many channels" @@ -129,6 +129,13 @@ defmodule RealtimeWeb.RealtimeChannel do msg = "Too many joins per second" Logging.log_error_message(:error, "ClientJoinRateLimitReached", msg) + {:error, :increase_connection_pool} -> + msg = "Please increase your connection pool size" + Logging.log_error_message(:warning, "IncreaseConnectionPool", msg) + + {:error, :unable_to_set_policies, error} -> + Logging.log_error_message(:warning, "UnableToSetPolicies", error) + {:error, :tenant_database_unavailable} -> Logging.log_error_message( :error, @@ -180,13 +187,6 @@ defmodule RealtimeWeb.RealtimeChannel do "Realtime is restarting, please standby" ) - {:error, :unable_to_set_policies} -> - Logging.log_error_message( - :error, - "UnableToSetPolicies", - "Unable to set policies for connection" - ) - {:error, error} -> Logging.log_error_message(:error, "UnknownErrorOnChannel", error) end @@ -284,7 +284,7 @@ defmodule RealtimeWeb.RealtimeChannel do {:noreply, assign(socket, :pg_sub_ref, nil)} error -> - log_error("UnableToSubscribeToPostgres", error) + log_warning("UnableToSubscribeToPostgres", error) push_system_message("postgres_changes", socket, "error", error, channel_name) {:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))} end @@ -294,13 +294,13 @@ defmodule RealtimeWeb.RealtimeChannel do {:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())} error -> - log_error("UnableToSubscribeToPostgres", error) + log_warning("UnableToSubscribeToPostgres", error) push_system_message("postgres_changes", socket, "error", error, channel_name) {:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))} end rescue error -> - log_error("UnableToSubscribeToPostgres", error) + log_warning("UnableToSubscribeToPostgres", error) push_system_message("postgres_changes", socket, "error", error, channel_name) {:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))} end @@ -585,8 +585,8 @@ defmodule RealtimeWeb.RealtimeChannel do defp shutdown_response(socket, message) when is_binary(message) do %{assigns: %{channel_name: channel_name, access_token: access_token}} = socket metadata = log_metadata(access_token) - log_error("ChannelShutdown", message, metadata) push_system_message("system", socket, "error", message, channel_name) + log_warning("ChannelShutdown", message, metadata) {:stop, :shutdown, socket} end @@ -749,9 +749,17 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, socket} end else + {:error, :increase_connection_pool} -> + {:error, :increase_connection_pool} + + {:error, :rls_policy_error, error} -> + log_error("RlsPolicyError", error) + + {:error, :unauthorized, + "You do not have permissions to read from this Channel topic: #{topic}"} + {:error, error} -> - log_error("UnableToSetPolicies", error) - {:error, :unable_to_set_policies} + {:error, :unable_to_set_policies, error} end end diff --git a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex index ddbf588d3..0b6653e4b 100644 --- a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do """ require Logger import Phoenix.Socket, only: [assign: 3] + import Realtime.Logs alias Phoenix.Socket alias Realtime.GenCounter @@ -29,14 +30,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do } } = socket ) do - {:ok, socket} = run_authorization_check(socket, db_conn, authorization_context) + with {:ok, %{assigns: %{policies: policies}}} <- + run_authorization_check(socket, db_conn, authorization_context) do + case policies do + %Policies{broadcast: %BroadcastPolicies{write: false}} -> + Logger.info("Broadcast message ignored on #{tenant_topic}") - case socket.assigns.policies do - %Policies{broadcast: %BroadcastPolicies{write: false}} -> - Logger.info("Broadcast message ignored on #{tenant_topic}") + _ -> + send_message(self_broadcast, tenant_topic, payload) + end + else + {:error, :increase_connection_pool} -> + log_error("IncreaseConnectionPool", "Please increase your connection pool size") + {:error, :unable_to_set_policies} - _ -> - send_message(self_broadcast, tenant_topic, payload) + {:error, error} -> + log_error("UnableToSetPolicies", error) + {:error, :unable_to_set_policies} end socket = increment_rate_counter(socket) diff --git a/lib/realtime_web/channels/realtime_channel/logging.ex b/lib/realtime_web/channels/realtime_channel/logging.ex index f01bb2142..96e500c76 100644 --- a/lib/realtime_web/channels/realtime_channel/logging.ex +++ b/lib/realtime_web/channels/realtime_channel/logging.ex @@ -31,14 +31,13 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do ) :: {:error, %{reason: binary()}} def log_error_message(level, code, error, metadata \\ []) - def log_error_message(:warning, _code, error, metadata) do - error_msg = "Start channel error: " <> to_log(error) - Logger.warning(error_msg, metadata) - {:error, %{reason: error_msg}} - end - def log_error_message(:error, code, error, metadata) do log_error(code, error, metadata) {:error, %{reason: error}} end + + def log_error_message(:warning, code, error, metadata) do + log_warning(code, error, metadata) + {:error, %{reason: error}} + end end diff --git a/lib/realtime_web/channels/user_socket.ex b/lib/realtime_web/channels/user_socket.ex index cba369586..6fdee146f 100644 --- a/lib/realtime_web/channels/user_socket.ex +++ b/lib/realtime_web/channels/user_socket.ex @@ -87,11 +87,11 @@ defmodule RealtimeWeb.UserSocket do {:error, :tenant_not_found} {:error, :expired_token, msg} -> - log_error_with_token_metadata(msg, token) + log_warning_with_token_metadata(msg, token) {:error, :expired_token} {:error, :missing_claims} -> - log_error_with_token_metadata("Fields `role` and `exp` are required in JWT", token) + log_warning_with_token_metadata("Fields `role` and `exp` are required in JWT", token) {:error, :missing_claims} error -> @@ -108,14 +108,14 @@ defmodule RealtimeWeb.UserSocket do end end - defp log_error_with_token_metadata(msg, token) do + defp log_warning_with_token_metadata(msg, token) do case Joken.peek_claims(token) do {:ok, claims} -> sub = Map.get(claims, "sub") - log_error("InvalidJWTToken", msg, sub: sub) + log_warning("InvalidJWTToken", msg, sub: sub) _ -> - log_error("InvalidJWTToken", msg) + log_warning("InvalidJWTToken", msg) end end end diff --git a/mix.exs b/mix.exs index 0373423fe..2878c637e 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.34.10", + version: "2.34.11", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 6beca50a0..08d15e195 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -24,7 +24,7 @@ defmodule Realtime.Integration.RtChannelTest do @port 4002 @serializer V1.JSONSerializer @external_id "dev_tenant" - @uri "ws://#{@external_id}.localhost:#{@port}/socket/websocket?vsn=1.0.0" + @uri "ws://#{@external_id}.localhost:#{@port}/socket/websocket" @secret "secure_jwt_secret" Application.put_env(:phoenix, Endpoint, @@ -365,21 +365,23 @@ defmodule Realtime.Integration.RtChannelTest do topic = "realtime:#{topic}" {socket, _} = get_connection("authenticated") - assert capture_log(fn -> - WebsocketClient.join(socket, topic, %{config: config}) - Process.sleep(500) - end) =~ "Unauthorized: #{expected}" + log = + capture_log(fn -> + WebsocketClient.join(socket, topic, %{config: config}) - assert_receive %Message{ - topic: ^topic, - event: "phx_reply", - payload: %{"response" => %{"reason" => reason}, "status" => "error"} - }, - 1000 + assert_receive %Message{ + topic: ^topic, + event: "phx_reply", + payload: %{"response" => %{"reason" => reason}, "status" => "error"} + }, + 1000 + + assert reason == expected + refute_receive %Message{event: "phx_reply", topic: ^topic}, 1000 + refute_receive %Message{event: "presence_state"}, 1000 + end) - assert reason == expected - refute_receive %Message{event: "phx_reply", topic: ^topic}, 1000 - refute_receive %Message{event: "presence_state"}, 1000 + assert log =~ "Unauthorized: #{expected}" end end @@ -657,21 +659,24 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "presence_state"}, 500 {:ok, token} = generate_token(%{:exp => System.system_time(:second) - 1000, sub: sub}) - assert capture_log(fn -> - WebsocketClient.send_event(socket, realtime_topic, "access_token", %{ - "access_token" => token - }) - - assert_receive %Message{ - topic: ^realtime_topic, - event: "system", - payload: %{ - "extension" => "system", - "message" => "Token as expired 1000 seconds ago", - "status" => "error" - } - } - end) =~ "ChannelShutdown: Token as expired 1000 seconds ago" + log = + capture_log(fn -> + WebsocketClient.send_event(socket, realtime_topic, "access_token", %{ + "access_token" => token + }) + + assert_receive %Message{ + topic: ^realtime_topic, + event: "system", + payload: %{ + "extension" => "system", + "message" => "Token as expired 1000 seconds ago", + "status" => "error" + } + } + end) + + assert log =~ "ChannelShutdown: Token as expired 1000 seconds ago" end test "ChannelShutdown include sub if available in jwt claims", @@ -679,7 +684,7 @@ defmodule Realtime.Integration.RtChannelTest do sub = random_string() {socket, access_token} = - get_connection("authenticated", %{sub: sub}) + get_connection("authenticated", %{sub: sub}, %{log_level: :warning}) config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -841,7 +846,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply"}, 500 assert_receive %Message{event: "presence_state"}, 500 - # token beconmes a string in between joins so it needs to be handled by the channel and not the socket + # token becomes a string in between joins so it needs to be handled by the channel and not the socket Process.sleep(1000) realtime_topic = "realtime:#{topic}" WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: "potato"}) @@ -1201,9 +1206,12 @@ defmodule Realtime.Integration.RtChannelTest do end test "invalid JWT with expired token" do - assert capture_log(fn -> - get_connection("authenticated", %{:exp => System.system_time(:second) - 1000}) - end) =~ "InvalidJWTToken: Token as expired 1000 seconds ago" + log = + capture_log(fn -> + get_connection("authenticated", %{:exp => System.system_time(:second) - 1000}) + end) + + assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago" end end @@ -1343,6 +1351,40 @@ defmodule Realtime.Integration.RtChannelTest do end end + describe "authorization handling" do + setup [:rls_context] + + @tag role: "authenticated", + policies: [:broken_read_presence, :broken_write_presence] + test "handle failing rls policy" do + {socket, _} = get_connection("authenticated") + config = %{broadcast: %{self: true}, private: true} + topic = random_string() + realtime_topic = "realtime:#{topic}" + + log = + capture_log(fn -> + WebsocketClient.join(socket, realtime_topic, %{config: config}) + + msg = "You do not have permissions to read from this Channel topic: #{topic}" + + assert_receive %Message{ + event: "phx_reply", + payload: %{ + "response" => %{"reason" => ^msg}, + "status" => "error" + } + }, + 500 + + refute_receive %Message{event: "phx_reply"} + refute_receive %Message{event: "presence_state"} + end) + + assert log =~ "RlsPolicyError" + end + end + test "handle empty topic by closing the socket" do {socket, _} = get_connection("authenticated") config = %{broadcast: %{self: true}, private: false} @@ -1380,10 +1422,17 @@ defmodule Realtime.Integration.RtChannelTest do {:ok, generate_jwt_token(@secret, claims)} end - defp get_connection(role \\ "anon", claims \\ %{}) do + defp get_connection( + role \\ "anon", + claims \\ %{}, + params \\ %{vsn: "1.0.0", log_level: :warning} + ) do + params = Enum.reduce(params, "", fn {k, v}, acc -> "#{acc}&#{k}=#{v}" end) + uri = "#{@uri}?#{params}" + with {:ok, token} <- token_valid(role, claims), {:ok, socket} <- - WebsocketClient.connect(self(), @uri, @serializer, [{"x-api-key", token}]) do + WebsocketClient.connect(self(), uri, @serializer, [{"x-api-key", token}]) do {socket, token} end end diff --git a/test/realtime/tenants/authorization_test.exs b/test/realtime/tenants/authorization_test.exs index bd066ccb3..83688ccfe 100644 --- a/test/realtime/tenants/authorization_test.exs +++ b/test/realtime/tenants/authorization_test.exs @@ -205,6 +205,99 @@ defmodule Realtime.Tenants.AuthorizationTest do end end + describe "database error" do + @tag role: "authenticated", + policies: [ + :authenticated_read_broadcast_and_presence, + :authenticated_write_broadcast_and_presence + ], + timeout: :timer.minutes(2) + test "handles small pool size", context do + task = + Task.async(fn -> + Postgrex.query!(context.db_conn, "SELECT pg_sleep(59)", [], timeout: :timer.minutes(1)) + end) + + Process.sleep(100) + + assert {:error, :increase_connection_pool} = + Authorization.get_read_authorizations( + Phoenix.ConnTest.build_conn(), + context.db_conn, + context.authorization_context + ) + + assert {:error, :increase_connection_pool} = + Authorization.get_write_authorizations( + Phoenix.ConnTest.build_conn(), + context.db_conn, + context.authorization_context + ) + + assert {:error, :increase_connection_pool} = + Authorization.get_read_authorizations( + Phoenix.ChannelTest.socket(RealtimeWeb.UserSocket), + context.db_conn, + context.authorization_context + ) + + assert {:error, :increase_connection_pool} = + Authorization.get_write_authorizations( + Phoenix.ChannelTest.socket(RealtimeWeb.UserSocket), + context.db_conn, + context.authorization_context + ) + + assert {:error, :increase_connection_pool} = + Authorization.get_write_authorizations( + context.db_conn, + context.db_conn, + context.authorization_context + ) + + Task.await(task, :timer.minutes(1)) + end + + @tag role: "authenticated", + policies: [:broken_read_presence, :broken_write_presence] + test "broken RLS policy sets policies to false and shows error to user", context do + assert {:error, :rls_policy_error, %Postgrex.Error{}} = + Authorization.get_read_authorizations( + Phoenix.ConnTest.build_conn(), + context.db_conn, + context.authorization_context + ) + + assert {:error, :rls_policy_error, %Postgrex.Error{}} = + Authorization.get_write_authorizations( + Phoenix.ConnTest.build_conn(), + context.db_conn, + context.authorization_context + ) + + assert {:error, :rls_policy_error, %Postgrex.Error{}} = + Authorization.get_read_authorizations( + Phoenix.ChannelTest.socket(RealtimeWeb.UserSocket), + context.db_conn, + context.authorization_context + ) + + assert {:error, :rls_policy_error, %Postgrex.Error{}} = + Authorization.get_write_authorizations( + Phoenix.ChannelTest.socket(RealtimeWeb.UserSocket), + context.db_conn, + context.authorization_context + ) + + assert {:error, :rls_policy_error, %Postgrex.Error{}} = + Authorization.get_write_authorizations( + context.db_conn, + context.db_conn, + context.authorization_context + ) + end + end + describe "ensure database stays clean" do @tag role: "authenticated", policies: [ @@ -275,7 +368,7 @@ defmodule Realtime.Tenants.AuthorizationTest do def rls_context(context) do start_supervised!(CurrentTime.Mock) tenant = tenant_fixture() - Migrations.run_migrations(tenant) + :ok = Migrations.run_migrations(tenant) {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) @@ -300,7 +393,7 @@ defmodule Realtime.Tenants.AuthorizationTest do Realtime.Tenants.Migrations.create_partitions(db_conn) - on_exit(fn -> Process.exit(db_conn, :normal) end) + on_exit(fn -> Process.exit(db_conn, :kill) end) %{ tenant: tenant, diff --git a/test/realtime_web/channels/realtime_channel/logging_test.exs b/test/realtime_web/channels/realtime_channel/logging_test.exs index 185c19064..dfd7fb0b3 100644 --- a/test/realtime_web/channels/realtime_channel/logging_test.exs +++ b/test/realtime_web/channels/realtime_channel/logging_test.exs @@ -29,10 +29,10 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do describe "log_error_message/3" do test "handles warning level errors" do - assert capture_log(fn -> - result = Logging.log_error_message(:warning, :test_code, "test error") - assert {:error, %{reason: "Start channel error: test error"}} = result - end) =~ "Start channel error: test error" + assert capture_log([level: :warning], fn -> + result = Logging.log_error_message(:warning, "TestError", "test error") + assert {:error, %{reason: "test error"}} = result + end) =~ "TestError: test error" end test "handles error level errors" do diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 2b5287c7c..454a0793e 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -10,7 +10,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do alias RealtimeWeb.UserSocket alias RealtimeWeb.Joken.CurrentTime - @tenant "dev_tenant" + @tenant_external_id "dev_tenant" @default_limits %{ max_concurrent_users: 200, @@ -27,14 +27,10 @@ defmodule RealtimeWeb.RealtimeChannelTest do describe "maximum number of connected clients per tenant" do test "not reached" do - with_mocks([ - {ChannelsAuthorization, [], - [ - authorize_conn: fn _, _, _ -> - {:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}} - end - ]} - ]) do + with_mock ChannelsAuthorization, [], + authorize_conn: fn _, _, _ -> + {:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}} + end do {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts()) socket = Socket.assign(socket, %{limits: %{@default_limits | max_concurrent_users: 1}}) @@ -43,14 +39,10 @@ defmodule RealtimeWeb.RealtimeChannelTest do end test "reached" do - with_mocks([ - {ChannelsAuthorization, [], - [ - authorize_conn: fn _, _, _ -> - {:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}} - end - ]} - ]) do + with_mock ChannelsAuthorization, [], + authorize_conn: fn _, _, _ -> + {:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}} + end do {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts()) socket_at_capacity = @@ -70,14 +62,10 @@ defmodule RealtimeWeb.RealtimeChannelTest do describe "JWT token validations" do test "token has valid expiration" do - with_mocks([ - {ChannelsAuthorization, [], - [ - authorize_conn: fn _, _, _ -> - {:ok, %{"exp" => Joken.current_time() + 1, "role" => "postgres"}} - end - ]} - ]) do + with_mock ChannelsAuthorization, [], + authorize_conn: fn _, _, _ -> + {:ok, %{"exp" => Joken.current_time() + 1, "role" => "postgres"}} + end do {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts()) assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{}) @@ -85,14 +73,10 @@ defmodule RealtimeWeb.RealtimeChannelTest do end test "token has invalid expiration" do - with_mocks([ - {ChannelsAuthorization, [], - [ - authorize_conn: fn _, _, _ -> - {:ok, %{"exp" => Joken.current_time(), "role" => "postgres"}} - end - ]} - ]) do + with_mock ChannelsAuthorization, [], + authorize_conn: fn _, _, _ -> + {:ok, %{"exp" => Joken.current_time(), "role" => "postgres"}} + end do {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts()) assert capture_log(fn -> @@ -101,14 +85,10 @@ defmodule RealtimeWeb.RealtimeChannelTest do end) =~ "InvalidJWTExpiration: Token expiration time is invalid" end - with_mocks([ - {ChannelsAuthorization, [], - [ - authorize_conn: fn _, _, _ -> - {:ok, %{"exp" => Joken.current_time() - 1, "role" => "postgres"}} - end - ]} - ]) do + with_mock ChannelsAuthorization, [], + authorize_conn: fn _, _, _ -> + {:ok, %{"exp" => Joken.current_time() - 1, "role" => "postgres"}} + end do {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts()) assert capture_log(fn -> @@ -123,7 +103,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do authorize_conn: fn _, _, _ -> {:error, :missing_claims} end do log = capture_log(fn -> - assert {:error, :missing_claims} = connect(UserSocket, %{}, conn_opts()) + assert {:error, :missing_claims} = + connect(UserSocket, %{"log_level" => "warning"}, conn_opts()) end) assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT" @@ -134,10 +115,12 @@ defmodule RealtimeWeb.RealtimeChannelTest do with_mock ChannelsAuthorization, [], authorize_conn: fn _, _, _ -> {:error, :missing_claims} end do sub = random_string() + conn_opts = conn_opts(@tenant_external_id, %{sub: sub}) log = capture_log(fn -> - assert {:error, :missing_claims} = connect(UserSocket, %{}, conn_opts(%{sub: sub})) + assert {:error, :missing_claims} = + connect(UserSocket, %{"log_level" => "warning"}, conn_opts) end) assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT" @@ -151,10 +134,12 @@ defmodule RealtimeWeb.RealtimeChannelTest do {:error, :expired_token, "InvalidJWTToken: Token as expired 1000 seconds ago"} end do sub = random_string() + conn_opts = conn_opts(@tenant_external_id, %{sub: sub}) log = capture_log(fn -> - assert {:error, :expired_token} = connect(UserSocket, %{}, conn_opts(%{sub: sub})) + assert {:error, :expired_token} = + connect(UserSocket, %{"log_level" => "warning"}, conn_opts) end) assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago" @@ -169,7 +154,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do end do log = capture_log(fn -> - assert {:error, :expired_token} = connect(UserSocket, %{}, conn_opts()) + assert {:error, :expired_token} = + connect(UserSocket, %{"log_level" => "warning"}, conn_opts()) end) assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago" @@ -212,25 +198,17 @@ defmodule RealtimeWeb.RealtimeChannelTest do ] tenant = tenant_fixture(%{extensions: extensions}) - - conn_opts = [ - connect_info: %{ - uri: %{host: "#{tenant.external_id}.localhost:4000/socket/websocket", query: ""}, - x_headers: [{"x-api-key", "token123"}] - } - ] - - {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant.external_id)) assert {:error, %{reason: "Realtime was unable to connect to the project database"}} = subscribe_and_join(socket, "realtime:test", %{}) end end - defp conn_opts(claims \\ %{}) do + defp conn_opts(tenant_id \\ @tenant_external_id, claims \\ %{}) do [ connect_info: %{ - uri: %{host: "#{@tenant}.localhost:4000/socket/websocket", query: ""}, + uri: URI.parse("https://#{tenant_id}.localhost:4000/socket/websocket"), x_headers: [{"x-api-key", generate_jwt_token("secret", claims)}] } ] diff --git a/test/support/generators.ex b/test/support/generators.ex index f01961e74..2c4bf6041 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -120,6 +120,16 @@ defmodule Generators do * write_broadcast - Sets write broadcast policy for authenticated role """ def create_rls_policies(conn, policies, params) do + query = """ + CREATE OR REPLACE FUNCTION test_log_error() RETURNS boolean AS $$ + BEGIN + RAISE EXCEPTION 'test error'; + RETURN TRUE; + END$$ LANGUAGE plpgsql; + """ + + Postgrex.query!(conn, query, []) + Enum.each(policies, fn policy -> query = policy_query(policy, params) Postgrex.query!(conn, query, []) @@ -218,6 +228,24 @@ defmodule Generators do """ end + def policy_query(:broken_read_presence, _) do + """ + CREATE POLICY "authenticated_read_presence" + ON realtime.messages FOR SELECT + TO authenticated + USING ( (SELECT test_log_error()) ); + """ + end + + def policy_query(:broken_write_presence, _) do + """ + CREATE POLICY "authenticated_write_presence" + ON realtime.messages FOR INSERT + TO authenticated + WITH CHECK ( (SELECT test_log_error()) ); + """ + end + def generate_jwt_token(secret, claims) do signer = Joken.Signer.create("HS256", secret) {:ok, claims} = Joken.generate_claims(%{}, claims)