Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A draft of simplifying the service implementation #145

Merged
merged 3 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions bench/service_bench.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule EchoService do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a benchmark to try to measure the impact of these changes, but it turns out this branch only make a ~5% improvement in throughput.

We can get around 13.7k service messages per second on both the main branch and about 14.5k service messages per second on this branch. 🤷

use Gnat.Services.Server

def request(%{body: body}, "echo", _group) do
{:reply, body}
end

def definition do
%{
name: "echo",
description: "This is an example service",
version: "0.0.1",
endpoints: [
%{
name: "echo",
group_name: "mygroup",
}
]
}
end
end

conn_supervisor_settings = %{
name: :gnat, # (required) the registered named you want to give the Gnat connection
backoff_period: 1_000, # number of milliseconds to wait between consecutive reconnect attempts (default: 2_000)
connection_settings: [
%{host: '127.0.0.1', port: 4222},
]
}
{:ok, _pid} = Gnat.ConnectionSupervisor.start_link(conn_supervisor_settings)

# let the connection get established
:timer.sleep(100)

consumer_supervisor_settings = %{
connection_name: :gnat,
module: EchoService, # a module that implements the Gnat.Services.Server behaviour
service_definition: EchoService.definition()
}

{:ok, _pid} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)

# wait for the connection and consumer to be ready
:timer.sleep(2000)

{:ok, client_pid} = Gnat.start_link(%{host: '127.0.0.1', port: 4222})

inputs = %{
"16 byte" => :crypto.strong_rand_bytes(16),
"256 byte" => :crypto.strong_rand_bytes(256),
"1024 byte" => :crypto.strong_rand_bytes(1024),
}

Benchee.run(%{
"service" => fn(msg) -> {:ok, %{body: ^msg}} = Gnat.request(client_pid, "mygroup.echo", msg) end,
}, time: 10, parallel: 1, inputs: inputs, formatters: [{Benchee.Formatters.Console, comparisons: false}])
111 changes: 63 additions & 48 deletions lib/gnat/consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
defmodule Gnat.ConsumerSupervisor do
# required default, see https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md#request-handling
@default_service_queue_group "q"

use GenServer
require Logger

alias Gnat.Services.Service

@moduledoc """
A process that can supervise consumers for you
Expand Down Expand Up @@ -62,7 +59,6 @@ defmodule Gnat.ConsumerSupervisor do
It's also possible to pass a `%{consuming_function: {YourModule, :your_function}}` rather than a `:module` in your settings.
In that case no error handling or replying is taking care of for you, microservices cannot be used, and it will be up to your function to take whatever action you want with each message.
"""
alias Gnat.Services.ServiceResponder
@spec start_link(map(), keyword()) :: GenServer.on_start
def start_link(settings, options \\ []) do
GenServer.start_link(__MODULE__, settings, options)
Expand All @@ -75,7 +71,6 @@ defmodule Gnat.ConsumerSupervisor do
{:ok, task_supervisor_pid} = Task.Supervisor.start_link()
connection_name = Map.get(settings, :connection_name)
subscription_topics = Map.get(settings, :subscription_topics)
microservice = Map.get(settings, :service_definition)

state = %{
connection_name: connection_name,
Expand All @@ -84,23 +79,15 @@ defmodule Gnat.ConsumerSupervisor do
status: :disconnected,
subscription_topics: subscription_topics,
subscriptions: [],
task_supervisor_pid: task_supervisor_pid,
service_config: microservice,
task_supervisor_pid: task_supervisor_pid
}

cond do
Map.has_key?(settings, :module) ->
state = Map.put(state, :module, settings.module)
send self(), :connect
{:ok, state}

Map.has_key?(settings, :consuming_function) ->
state = Map.put(state, :consuming_function, settings.consuming_function)
send self(), :connect
{:ok, state}

true ->
{:error, "You must provide a module or consuming function for the consumer supervisor"}
with {:ok, state} <- maybe_append_service(state, settings),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if you'll like this change. The cond block was a bit tricky to read and I wanted to validate the user-input about the service definition during process startup rather than crashing the process later when the connection became available.

So I used a with block here to show the happy path, and I perform all of the service configuration validation right here (as well as initializing service counters etc). This way our stats will persist across re-connects and if the user has an invalid validation they'll get a helpful error at boot time rather than a process crash later

{:ok, state} <- maybe_append_module(state, settings),
{:ok, state} <- maybe_append_consuming_function(state, settings),
:ok <- validate_state(state) do
send self(), :connect
{:ok, state}
end
end

Expand All @@ -112,13 +99,9 @@ defmodule Gnat.ConsumerSupervisor do
{:noreply, state}
connection_pid ->
_ref = Process.monitor(connection_pid)
state = if Map.get(state, :service_config) do
initialize_as_microservice(state, connection_pid)
else
initialize_as_manual_consumer(state, connection_pid)
end
subscriptions = subscribe_to_topics(state, connection_pid)

{:noreply, %{state | status: :connected, connection_pid: connection_pid}}
{:noreply, %{state | status: :connected, connection_pid: connection_pid, subscriptions: subscriptions}}
end
end

Expand All @@ -134,12 +117,14 @@ defmodule Gnat.ConsumerSupervisor do
{:noreply, Map.put(state, :task_supervisor_pid, task_supervisor_pid)}
end

def handle_info({:msg, gnat_message}, %{service: service, module: module} = state) do
Task.Supervisor.async_nolink(state.task_supervisor_pid, Gnat.Services.Server, :execute, [module, gnat_message, service])

{:noreply, state}
end

def handle_info({:msg, gnat_message}, %{module: module} = state) do
if Map.get(state, :service_config) do
Task.Supervisor.async_nolink(state.task_supervisor_pid, Gnat.Services.Server, :execute, [module, gnat_message, state.svc_responder_pid])
else
Task.Supervisor.async_nolink(state.task_supervisor_pid, Gnat.Server, :execute, [module, gnat_message])
end
Task.Supervisor.async_nolink(state.task_supervisor_pid, Gnat.Server, :execute, [module, gnat_message])

{:noreply, state}
end
Expand Down Expand Up @@ -189,35 +174,65 @@ defmodule Gnat.ConsumerSupervisor do
end
end

defp initialize_as_manual_consumer(state, connection_pid) do
subscriptions = Enum.map(state.subscription_topics, fn(topic_and_queue_group) ->
defp subscribe_to_topics(%{service: service}, connection_pid) do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally prefer to do pattern matching in my function heads rather than if checks inside the function bodies. I don't feel strongly about this preference, but I like how it looks in this case. What do you think?

Service.subscription_topics_with_queue_group(service)
|> Enum.map(fn
({topic, nil}) ->
{:ok, subscription} = Gnat.sub(connection_pid, self(), topic)
subscription

({topic, queue_group}) ->
{:ok, subscription} = Gnat.sub(connection_pid, self(), topic, queue_group: queue_group)
subscription
end)
end

defp subscribe_to_topics(state, connection_pid) do
Enum.map(state.subscription_topics, fn(topic_and_queue_group) ->
topic = Map.fetch!(topic_and_queue_group, :topic)
{:ok, subscription} = case Map.get(topic_and_queue_group, :queue_group) do
nil -> Gnat.sub(connection_pid, self(), topic)
queue_group -> Gnat.sub(connection_pid, self(), topic, queue_group: queue_group)
end
subscription
end)

%{ state | subscriptions: subscriptions }
end

defp initialize_as_microservice(state, connection_pid) do
if :ets.whereis(:endpoint_stats) == :undefined do
:ets.new(:endpoint_stats, [:public, :set, :named_table])
defp maybe_append_service(state, %{service_definition: config}) do
case Service.init(config) do
{:ok, service} ->
{:ok, Map.put(state, :service, service)}

{:error, errors} ->
{:stop, "Invalid service configuration: #{Enum.join(errors, ",")}"}
end
{:ok, responder_pid} = ServiceResponder.start_link(%{ state | connection_pid: connection_pid })
endpoints = get_in(state, [:service_config, :endpoints])
end

subscriptions = Enum.map(endpoints, fn(ep) ->
subject = ServiceResponder.derive_subscription_subject(ep)
queue_group = Map.get(ep, :queue_group, @default_service_queue_group)
{:ok, subscription} = Gnat.sub(connection_pid, self(), subject, queue_group: queue_group)
defp maybe_append_service(state, _), do: {:ok, state}

subscription
end)
defp maybe_append_module(state, %{module: module}) do
{:ok, Map.put(state, :module, module)}
end

%{state | svc_responder_pid: responder_pid, subscriptions: subscriptions }
defp maybe_append_module(state, _), do: {:ok, state}

defp maybe_append_consuming_function(state, %{consuming_function: consuming_function}) do
{:ok, Map.put(state, :consuming_function, consuming_function)}
end

defp maybe_append_consuming_function(state, _), do: {:ok, state}

defp validate_state(state) do
partial = Map.take(state, [:module, :consuming_function])
case Enum.count(partial) do
0 ->
{:stop, "You must provide a module or consuming function for the consumer supervisor"}

1 ->
:ok

_ ->
{:stop, "You cannot provide both a module and consuming function. Please specify one or the other."}
end
end
end
31 changes: 19 additions & 12 deletions lib/gnat/services/server.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Gnat.Services.Server do
require Logger
alias Gnat.Services.{Service, ServiceResponder}

@moduledoc """
A behavior for acting as a NATS service
Expand Down Expand Up @@ -31,7 +32,6 @@ defmodule Gnat.Services.Server do
end
end
"""
alias Gnat.Services.ServiceResponder

@doc """
Called when a message is received from the broker. The endpoint on which the message arrived
Expand Down Expand Up @@ -72,8 +72,8 @@ alias Gnat.Services.ServiceResponder

@typedoc """
Service configuration is provided as part of the consumer supervisor settings in the `service_definition` field.
You can specify _either_ the `subscription_topics` field for a reguar server or the `service_definition` field for a
new NATS service.
You can specify _either_ the `subscription_topics` field for a regluar server or the `service_definition` field
for a new NATS service.

* `name` - The name of the service. Needs to conform to the rules for NATS service names
* `version` - A required version number (w/out "v" prefix) conforming to semver rules
Expand All @@ -86,7 +86,6 @@ alias Gnat.Services.ServiceResponder
required(:name) => binary(),
required(:version) => binary(),
required(:endpoints) => [endpoint_configuration()],
optional(:queue_group) => binary(),
optional(:description) => binary(),
optional(:metadata) => map(),
}
Expand All @@ -98,7 +97,7 @@ alias Gnat.Services.ServiceResponder
* `subject` - A specific subject for this endpoint to listen on. If this is not provided, then the endpoint name will be used.
* `name` - The required name of the endpoint
* `group_name` - An optional group to which this endpoint belongs
* `queue_group` - A queue group for this endpoint's subscription. If not supplied, "q" will be used.
* `queue_group` - A queue group for this endpoint's subscription. If not supplied, "q" will be used (indicated by protocol spec).
* `metadata` - An optional string->string map containing metadata for this endpoint
"""
@type endpoint_configuration :: %{
Expand All @@ -111,20 +110,28 @@ alias Gnat.Services.ServiceResponder


@doc false
def execute(module, message, responder_pid) do
def execute(_module, %{topic: "$SRV" <> _} = message, service) do
ServiceResponder.maybe_respond(message, service)
end

def execute(module, message, service) do
try do
{endpoint, group} = ServiceResponder.lookup_endpoint(responder_pid, message.topic)
endpoint = Map.get(service.subjects, message.topic)
%{group_name: group_name, name: endpoint_name} = endpoint
telemetry_tags = %{topic: message.topic, endpoint: endpoint_name, group: group_name}

case :timer.tc(fn -> apply(module, :request, [message, endpoint, group]) end) do
case :timer.tc(fn -> apply(module, :request, [message, endpoint_name, group_name]) end) do
{_elapsed, :ok} -> :done
{elapsed_micros, {:reply, data}} ->
send_reply(message, data)
:telemetry.execute([:gnat, :service_request], %{latency: elapsed_micros}, %{topic: message.topic, endpoint: endpoint, group: group})
ServiceResponder.record_request(responder_pid, message.topic, elapsed_micros * 1000 )
:telemetry.execute([:gnat, :service_request], %{latency: elapsed_micros}, telemetry_tags)
Service.record_request(endpoint, elapsed_micros)

{elapsed_micros, {:error, error}} ->
execute_error(module, message, error)
:telemetry.execute([:gnat, :service_error], %{latency: elapsed_micros}, %{topic: message.topic, endpoint: endpoint, group: group})
ServiceResponder.record_error(responder_pid, message.topic, elapsed_micros * 1000, inspect(error))
:telemetry.execute([:gnat, :service_error], %{latency: elapsed_micros}, telemetry_tags)
Service.record_error(endpoint, elapsed_micros)

other -> execute_error(module, message, other)
end

Expand Down
Loading