Skip to content

Commit

Permalink
fix: Add sub to log metadata; adds tests to rate limits (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Jan 24, 2025
1 parent fa457b1 commit 5c0e68b
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 72 deletions.
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ config :joken,

# Print only errors during test
config :logger, level: :error

# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id, :project, :external_id, :application_name, :sub]
3 changes: 2 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ defmodule Realtime.Tenants.Connect do
end

@impl true
def terminate(_, %{tenant_id: tenant_id}) do
def terminate(reason, %{tenant_id: tenant_id}) do
Logger.info("Tenant #{tenant_id} has been terminated: #{inspect(reason)}")
Realtime.MetricsCleaner.delete_metric(tenant_id)
:ok
end
Expand Down
58 changes: 35 additions & 23 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ defmodule RealtimeWeb.RealtimeChannel do

{:ok, state, assign(socket, assigns)}
else
{:error, :unauthorized, msg} ->
Logging.log_error_message(:error, "Unauthorized", msg)

{:error, :expired_token, msg} ->
Logging.log_error_message(:error, "InvalidJWTToken", msg)

{:error, [message: "Invalid token", claim: claim, claim_val: value]} ->
msg = "Invalid value for JWT claim #{inspect(claim)} with value #{inspect(value)}"
{:error, :missing_claims} ->
msg = "Fields `role` and `exp` are required in JWT"
Logging.log_error_message(:error, "InvalidJWTToken", msg)

{:error, :expected_claims_map} ->
msg = "Token claims must be a map"
Logging.log_error_message(:error, "InvalidJWTToken", msg)

{:error, :too_many_channels} ->
Expand Down Expand Up @@ -370,20 +371,18 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, socket}
end

def handle_in(
"access_token",
%{"access_token" => refresh_token},
%{
assigns: %{
access_token: access_token,
pg_sub_ref: pg_sub_ref,
db_conn: db_conn,
channel_name: channel_name,
pg_change_params: pg_change_params
}
} = socket
)
def handle_in("access_token", %{"access_token" => refresh_token}, socket)
when is_binary(refresh_token) do
%{
assigns: %{
access_token: access_token,
pg_sub_ref: pg_sub_ref,
db_conn: db_conn,
channel_name: channel_name,
pg_change_params: pg_change_params
}
} = socket

socket = assign(socket, :access_token, refresh_token)

with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
Expand Down Expand Up @@ -485,7 +484,7 @@ defmodule RealtimeWeb.RealtimeChannel do
def limit_channels(%{assigns: %{tenant: tenant, limits: limits}, transport_pid: pid}) do
key = Tenants.channels_per_client_key(tenant)

if Registry.count_match(Realtime.Registry, key, pid) > limits.max_channels_per_client do
if Registry.count_match(Realtime.Registry, key, pid) + 1 > limits.max_channels_per_client do
{:error, :too_many_channels}
else
Registry.register(Realtime.Registry, Tenants.channels_per_client_key(tenant), pid)
Expand Down Expand Up @@ -580,9 +579,10 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp shutdown_response(%{assigns: %{channel_name: channel_name}} = socket, message)
when is_binary(message) do
log_error("ChannelShutdown", message)
defp shutdown_response(socket, message) when is_binary(message) do
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
metadata = log_metadata(access_token)
log_error("ChannelShutdown", message, metadata)
push_system_message("system", socket, "error", message, channel_name)
{:stop, :shutdown, socket}
end
Expand Down Expand Up @@ -756,7 +756,6 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, assign(socket, policies: nil)}
end

@spec only_private?(String.t(), map()) :: :ok | {:error, :private_only}
defp only_private?(tenant_id, %{assigns: %{check_authorization?: check_authorization?}}) do
tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)

Expand All @@ -765,4 +764,17 @@ defmodule RealtimeWeb.RealtimeChannel do
true -> :ok
end
end

defp log_metadata(access_token) do
access_token
|> Joken.peek_claims()
|> then(fn
{:ok, claims} -> Map.get(claims, "sub")
_ -> nil
end)
|> then(fn
nil -> []
sub -> [sub: sub]
end)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
defp increment_rate_counter(%{assigns: %{rate_counter: counter}} = socket) do
GenCounter.add(counter.id)
{:ok, rate_counter} = RateCounter.get(counter.id)

assign(socket, :rate_counter, rate_counter)
end

Expand Down
16 changes: 12 additions & 4 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,22 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
@doc """
Logs errors in an expected format
"""
def log_error_message(:warning, _code, error) do
@spec log_error_message(
level :: :error | :warning,
code :: binary(),
error :: term(),
keyword()
) :: {:error, %{reason: binary()}}
def log_error_message(level, code, error, metadata \\ [])

def log_error_message(:warning, _code, error, metadata) do
error_msg = "Start channel error: " <> to_log(error)
Logger.warning(error_msg)
Logger.warning(error_msg, metadata)
{:error, %{reason: error_msg}}
end

def log_error_message(:error, code, error) do
log_error(code, error)
def log_error_message(:error, code, error, metadata) do
log_error(code, error, metadata)
{:error, %{reason: error}}
end
end
28 changes: 20 additions & 8 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ defmodule RealtimeWeb.UserSocket do

@default_log_level "error"

@impl true
def id(%{assigns: %{tenant: tenant}}), do: subscribers_id(tenant)

@spec subscribers_id(String.t()) :: String.t()
def subscribers_id(tenant), do: "user_socket:" <> tenant

@impl true
def connect(params, socket, opts) do
if Application.fetch_env!(:realtime, :secure_channels) do
Expand All @@ -36,6 +42,7 @@ defmodule RealtimeWeb.UserSocket do

Logger.metadata(external_id: external_id, project: external_id)
Logger.put_process_level(self(), log_level)
token = access_token(params, headers)

with %Tenant{
extensions: extensions,
Expand All @@ -48,7 +55,7 @@ defmodule RealtimeWeb.UserSocket do
max_channels_per_client: max_channels_per_client,
postgres_cdc_default: postgres_cdc_default
} <- Tenants.Cache.get_tenant_by_external_id(external_id),
token when is_binary(token) <- access_token(params, headers),
token when is_binary(token) <- token,
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
Expand Down Expand Up @@ -80,11 +87,11 @@ defmodule RealtimeWeb.UserSocket do
{:error, :tenant_not_found}

{:error, :expired_token, msg} ->
log_error("InvalidJWTToken", msg)
log_error_with_token_metadata(msg, token)
{:error, :expired_token}

{:error, :missing_claims} ->
log_error("InvalidJWTToken", "Fields `role` and `exp` are required in JWT")
log_error_with_token_metadata("Fields `role` and `exp` are required in JWT", token)
{:error, :missing_claims}

error ->
Expand All @@ -94,16 +101,21 @@ defmodule RealtimeWeb.UserSocket do
end
end

def access_token(params, headers) do
defp access_token(params, headers) do
case :proplists.lookup("x-api-key", headers) do
:none -> Map.get(params, "apikey")
{"x-api-key", token} -> token
end
end

@impl true
def id(%{assigns: %{tenant: tenant}}), do: subscribers_id(tenant)
defp log_error_with_token_metadata(msg, token) do
case Joken.peek_claims(token) do
{:ok, claims} ->
sub = Map.get(claims, "sub")
log_error("InvalidJWTToken", msg, sub: sub)

@spec subscribers_id(String.t()) :: String.t()
def subscribers_id(tenant), do: "user_socket:" <> tenant
_ ->
log_error("InvalidJWTToken", msg)
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.9",
version: "2.34.10",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit 5c0e68b

Please sign in to comment.