Skip to content

Commit

Permalink
fix: on new keys, disconnect user socket
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Feb 5, 2025
1 parent 37995f6 commit 2be2cc9
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 232 deletions.
7 changes: 5 additions & 2 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ defmodule Realtime.Api do
data: %{external_id: external_id}
})
when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
IO.inspect("Triggering disconnect for #{"user_socket:" <> external_id}")

Check warning on line 127 in lib/realtime/api.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.

RealtimeWeb.Endpoint.broadcast("user_socket:" <> external_id, "disconnect", %{}) |> IO.inspect()

Check warning on line 129 in lib/realtime/api.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
end

defp maybe_trigger_disconnect(_), do: nil
Expand Down Expand Up @@ -198,7 +200,8 @@ defmodule Realtime.Api do
{value, settings} = Map.pop(extension.settings, from)
new_settings = Map.put(settings, to, value)

Ecto.Changeset.cast(extension, %{settings: new_settings}, [:settings])
extension
|> Ecto.Changeset.cast(%{settings: new_settings}, [:settings])
|> Repo.update!()
end
end
Expand Down
15 changes: 6 additions & 9 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ defmodule RealtimeWeb.RealtimeChannel do
Realtime.UsersCounter.add(transport_pid, tenant_id)
RealtimeWeb.Endpoint.subscribe(tenant_topic)
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
Process.monitor(transport_pid)

pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)

Expand Down Expand Up @@ -196,6 +197,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

@impl true
def handle_info(
_any,
%{
Expand All @@ -211,27 +213,22 @@ defmodule RealtimeWeb.RealtimeChannel do
shutdown_response(socket, message)
end

@impl true

def handle_info(:sync_presence = msg, socket) do
PresenceHandler.track(msg, socket)
end

@impl true
def handle_info(%{event: "postgres_cdc_rls_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(%{event: "postgres_cdc_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(
%{event: type, payload: payload} = msg,
%{assigns: %{policies: policies}} = socket
Expand Down Expand Up @@ -262,7 +259,6 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, socket}
end

@impl true
def handle_info(:postgres_subscribe, %{assigns: %{channel_name: channel_name}} = socket) do
%{
assigns: %{
Expand Down Expand Up @@ -309,7 +305,6 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end

@impl true
def handle_info(:confirm_token, %{assigns: %{pg_change_params: pg_change_params}} = socket) do
case confirm_token(socket) do
{:ok, claims, confirm_token_ref, _, _} ->
Expand All @@ -332,13 +327,15 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

def handle_info(:disconnect, %{assigns: %{channel_name: channel_name}} = socket) do
def handle_info(%{event: "phx_leave"}, %{assigns: %{channel_name: channel_name}} = socket) do
IO.inspect("!!!!")

Check warning on line 331 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
Logger.info("Received operational call to disconnect channel")
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
{:stop, :shutdown, socket}
{:stop, {:shutdown, :left}, socket}
end

def handle_info(msg, socket) do
IO.inspect(msg)

Check warning on line 338 in lib/realtime_web/channels/realtime_channel.ex

View workflow job for this annotation

GitHub Actions / Tests

There should be no calls to `IO.inspect/1`.
log_error("UnhandledSystemMessage", msg)
{:noreply, socket}
end
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
"open_api_spex": {:hex, :open_api_spex, "3.21.2", "6a704f3777761feeb5657340250d6d7332c545755116ca98f33d4b875777e1e5", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "f42ae6ed668b895ebba3e02773cfb4b41050df26f803f2ef634c72a7687dc387"},
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
"phoenix": {:hex, :phoenix, "1.7.18", "5310c21443514be44ed93c422e15870aef254cf1b3619e4f91538e7529d2b2e4", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "1797fcc82108442a66f2c77a643a62980f342bfeb63d6c9a515ab8294870004e"},
"phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},
Expand Down
64 changes: 64 additions & 0 deletions test/integration/integration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Integration do
import Generators

alias Realtime.Database
alias Realtime.Integration.WebsocketClient
alias Phoenix.Socket.V1
alias Realtime.Database
alias Realtime.Integration.WebsocketClient

@serializer V1.JSONSerializer
@secret "secure_jwt_secret"
@external_id "dev_tenant"
defp uri(port), do: "ws://#{@external_id}.localhost:#{port}/socket/websocket"
def token_valid(role, claims \\ %{}), do: generate_token(Map.put(claims, :role, role))
def token_no_role, do: generate_token()

def generate_token(claims \\ %{}) do
claims =
Map.merge(
%{
ref: "localhost",
iat: System.system_time(:second),
exp: System.system_time(:second) + 604_800
},
claims
)

{:ok, generate_jwt_token(@secret, claims)}
end

def get_connection(port, role \\ "anon", claims \\ %{}, params \\ %{vsn: "1.0.0", log_level: :warning}) do
params = Enum.reduce(params, "", fn {k, v}, acc -> "#{acc}&#{k}=#{v}" end)
uri = "#{uri(port)}?#{params}"

with {:ok, token} <- token_valid(role, claims),
{:ok, socket} <-
WebsocketClient.connect(self(), uri, @serializer, [{"x-api-key", token}]) do
{socket, token}
end
end

def rls_context(%{tenant: tenant} = context) do
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)

clean_table(db_conn, "realtime", "messages")
topic = Map.get(context, :topic, random_string())
message = message_fixture(tenant, %{topic: topic})

if policies = context[:policies] do
create_rls_policies(db_conn, policies, message)
end

Map.put(context, :topic, message.topic)
end

def change_tenant_configuration(limit, value) do
@external_id
|> Realtime.Tenants.get_tenant_by_external_id()
|> Realtime.Api.Tenant.changeset(%{limit => value})
|> Realtime.Repo.update!()

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)
end
end
Loading

0 comments on commit 2be2cc9

Please sign in to comment.