From c9a61c8a32e8b351c51049d176ff6fbaf66e271a Mon Sep 17 00:00:00 2001 From: cabol Date: Sun, 19 Mar 2023 18:08:08 +0100 Subject: [PATCH] [#49] New Redis Cluster management strategy --- .formatter.exs | 12 +- .gitignore | 1 + README.md | 8 +- coveralls.json | 2 + lib/nebulex_redis_adapter.ex | 151 +++++---- lib/nebulex_redis_adapter/bootstrap_server.ex | 7 +- lib/nebulex_redis_adapter/client_cluster.ex | 6 +- lib/nebulex_redis_adapter/command.ex | 127 ++++++-- lib/nebulex_redis_adapter/exceptions.ex | 45 +++ lib/nebulex_redis_adapter/helpers.ex | 47 +++ lib/nebulex_redis_adapter/redis_cluster.ex | 284 +++++++---------- .../redis_cluster/config_manager.ex | 300 ++++++++++++++++++ .../redis_cluster/dynamic_supervisor.ex | 25 ++ .../{supervisor.ex => pool_supervisor.ex} | 2 +- mix.exs | 2 +- test/docker/cluster/redis-node-0.conf | 2 +- test/docker/cluster/redis-node-1.conf | 2 +- test/docker/cluster/redis-node-2.conf | 2 +- test/docker/cluster/redis-node-3.conf | 2 +- test/docker/cluster/redis-node-4.conf | 2 +- test/docker/cluster/redis-node-5.conf | 2 +- test/nebulex_redis_adapter/command_test.exs | 9 +- .../redis_cluster_test.exs | 118 ++++++- test/test_helper.exs | 4 +- 24 files changed, 876 insertions(+), 286 deletions(-) create mode 100644 lib/nebulex_redis_adapter/exceptions.ex create mode 100644 lib/nebulex_redis_adapter/helpers.ex create mode 100644 lib/nebulex_redis_adapter/redis_cluster/config_manager.ex create mode 100644 lib/nebulex_redis_adapter/redis_cluster/dynamic_supervisor.ex rename lib/nebulex_redis_adapter/redis_cluster/{supervisor.ex => pool_supervisor.ex} (94%) diff --git a/.formatter.exs b/.formatter.exs index d2cda26..6634694 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,12 @@ -# Used by "mix format" +locals_without_parens = [ + # Helpers + wrap_error: 1, + wrap_error: 2 +] + [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], + line_length: 100, + locals_without_parens: locals_without_parens, + export: [locals_without_parens: locals_without_parens] ] diff --git a/.gitignore b/.gitignore index 5db2872..5f5d858 100644 --- a/.gitignore +++ b/.gitignore @@ -16,5 +16,6 @@ erl_crash.dump ._* .elixir* .vs* +*.coverdata /priv /nebulex diff --git a/README.md b/README.md index e10bce8..761ece0 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,13 @@ config :my_app, MyApp.RedisClusterCache, ] ``` -The pool of connections against the different master nodes is automatically +**IMPORTANT:** The option `:master_nodes` has been removed in favor of the + new cluster management strategy. Instead of `:master_nodes`, you just need + to configure the `:conn_opts` pointing to your configuration endpoint + (which could be one of the master nodes). See + [Redis Cluster options](https://hexdocs.pm/nebulex_redis_adapter/NebulexRedisAdapter.html#module-redis-cluster-options) + +The pool of connections to the different master nodes is automatically configured by the adapter once it gets the cluster slots info. > This one could be the easiest and recommended way for distributed caching diff --git a/coveralls.json b/coveralls.json index 4a52828..9ac930e 100644 --- a/coveralls.json +++ b/coveralls.json @@ -1,5 +1,7 @@ { "skip_files": [ + "lib/nebulex_redis_adapter/exceptions.ex", + "lib/nebulex_redis_adapter/helpers.ex", "test/*" ], diff --git a/lib/nebulex_redis_adapter.ex b/lib/nebulex_redis_adapter.ex index cfa7924..677cab0 100644 --- a/lib/nebulex_redis_adapter.ex +++ b/lib/nebulex_redis_adapter.ex @@ -19,9 +19,9 @@ defmodule NebulexRedisAdapter do provides a simple client-side cluster implementation based on Sharding distribution model via `:client_side_cluster` mode. - ## Shared Options + ## Configuration options - In addition to `Nebulex.Cache` shared options, this adapters supports the + In addition to `Nebulex.Cache` config options, the adapter supports the following options: * `:mode` - Defines the mode Redis will be set up. It can be one of the @@ -37,6 +37,22 @@ defmodule NebulexRedisAdapter do * `:serializer` - Custom serializer module implementing the `NebulexRedisAdapter.Serializer` behaviour. + ## Shared runtime options + + Since the adapter runs on top of `Redix`, all commands accept their options + (e.g.: `:timeout`, and `:telemetry_metadata`). See `Redix` docs for more + information. + + Additionally, the adapter support the following options for all commands too: + + * `:lock_retries` - This option is specific to the `:redis_cluster` mode. + When the config manager is running and setting up the hash slot map, + all Redis commands get blocked until the cluster is properly configured + and the hash slot map is ready to use. This option defines the max retry + attempts to acquire the lock and execute the command in case the + configuration manager is running and all commands are locked.. + Defaults to `:infinity`. + ## Telemetry events This adapter emits the recommended Telemetry events. @@ -104,7 +120,13 @@ defmodule NebulexRedisAdapter do password: "password" ] - ### Redis Cluster Options + **IMPORTANT:** The option `:master_nodes` has been removed in favor of the + new cluster management strategy. Instead of `:master_nodes`, you just need + to configure the `:conn_opts` pointing to your configuration endpoint + (which could be one of the master nodes). See the **"Redis Cluster options"** + next. + + ### Redis Cluster options In addition to shared options, `:redis_cluster` mode supports the following options: @@ -474,7 +496,7 @@ defmodule NebulexRedisAdapter do ## Nebulex.Adapter.Entry @impl true - defspan get(adapter_meta, key, _opts) do + defspan get(adapter_meta, key, opts) do %{ serializer: serializer, encode_key_opts: enc_key_opts, @@ -482,24 +504,25 @@ defmodule NebulexRedisAdapter do } = adapter_meta adapter_meta - |> Command.exec!(["GET", serializer.encode_key(key, enc_key_opts)], key) + |> Command.exec!(["GET", serializer.encode_key(key, enc_key_opts)], key, opts) |> serializer.decode_value(dec_value_opts) end @impl true - defspan get_all(adapter_meta, keys, _opts) do - do_get_all(adapter_meta, keys) + defspan get_all(adapter_meta, keys, opts) do + do_get_all(adapter_meta, keys, opts) end - defp do_get_all(%{mode: :standalone} = adapter_meta, keys) do - mget(nil, adapter_meta, keys) + defp do_get_all(%{mode: :standalone} = adapter_meta, keys, opts) do + mget(nil, adapter_meta, keys, opts) end - defp do_get_all(adapter_meta, keys) do + defp do_get_all(adapter_meta, keys, opts) do keys |> group_keys_by_hash_slot(adapter_meta) |> Enum.reduce(%{}, fn {hash_slot, keys}, acc -> - return = mget(hash_slot, adapter_meta, keys) + return = mget(hash_slot, adapter_meta, keys, opts) + Map.merge(acc, return) end) end @@ -511,12 +534,14 @@ defmodule NebulexRedisAdapter do encode_key_opts: enc_key_opts, decode_value_opts: dec_value_opts } = adapter_meta, - keys + keys, + opts ) do adapter_meta |> Command.exec!( ["MGET" | Enum.map(keys, &serializer.encode_key(&1, enc_key_opts))], - hash_slot_key + hash_slot_key, + opts ) |> Enum.reduce({keys, %{}}, fn nil, {[_key | keys], acc} -> @@ -529,7 +554,7 @@ defmodule NebulexRedisAdapter do end @impl true - defspan put(adapter_meta, key, value, ttl, on_write, _opts) do + defspan put(adapter_meta, key, value, ttl, on_write, opts) do %{ serializer: serializer, encode_key_opts: enc_key_opts, @@ -540,25 +565,25 @@ defmodule NebulexRedisAdapter do redis_v = serializer.encode_value(value, enc_value_opts) cmd_opts = cmd_opts(action: on_write, ttl: fix_ttl(ttl)) - case Command.exec!(adapter_meta, ["SET", redis_k, redis_v | cmd_opts], key) do + case Command.exec!(adapter_meta, ["SET", redis_k, redis_v | cmd_opts], key, opts) do "OK" -> true nil -> false end end @impl true - defspan put_all(adapter_meta, entries, ttl, on_write, _opts) do + defspan put_all(adapter_meta, entries, ttl, on_write, opts) do ttl = fix_ttl(ttl) case adapter_meta.mode do :standalone -> - do_put_all(adapter_meta, nil, entries, ttl, on_write) + do_put_all(adapter_meta, nil, entries, ttl, on_write, opts) _ -> entries |> group_keys_by_hash_slot(adapter_meta) |> Enum.reduce(:ok, fn {hash_slot, group}, acc -> - acc && do_put_all(adapter_meta, hash_slot, group, ttl, on_write) + acc && do_put_all(adapter_meta, hash_slot, group, ttl, on_write, opts) end) end end @@ -572,7 +597,8 @@ defmodule NebulexRedisAdapter do hash_slot, entries, ttl, - on_write + on_write, + opts ) do cmd = case on_write do @@ -593,7 +619,7 @@ defmodule NebulexRedisAdapter do end) adapter_meta - |> Command.pipeline!([Enum.reverse(mset) | expire], hash_slot) + |> Command.pipeline!([Enum.reverse(mset) | expire], hash_slot, opts) |> hd() |> case do "OK" -> :ok @@ -603,17 +629,17 @@ defmodule NebulexRedisAdapter do end @impl true - defspan delete(adapter_meta, key, _opts) do - _ = Command.exec!(adapter_meta, ["DEL", enc_key(adapter_meta, key)], key) + defspan delete(adapter_meta, key, opts) do + _ = Command.exec!(adapter_meta, ["DEL", enc_key(adapter_meta, key)], key, opts) :ok end @impl true - defspan take(adapter_meta, key, _opts) do + defspan take(adapter_meta, key, opts) do redis_k = enc_key(adapter_meta, key) - with_pipeline(adapter_meta, key, [["GET", redis_k], ["DEL", redis_k]]) + with_pipeline(adapter_meta, key, [["GET", redis_k], ["DEL", redis_k]], opts) end @impl true @@ -667,67 +693,74 @@ defmodule NebulexRedisAdapter do end @impl true - defspan update_counter(adapter_meta, key, incr, ttl, default, _opts) do - do_update_counter(adapter_meta, key, incr, ttl, default) + defspan update_counter(adapter_meta, key, incr, ttl, default, opts) do + do_update_counter(adapter_meta, key, incr, ttl, default, opts) end - defp do_update_counter(adapter_meta, key, incr, :infinity, default) do + defp do_update_counter(adapter_meta, key, incr, :infinity, default, opts) do redis_k = enc_key(adapter_meta, key) adapter_meta - |> maybe_incr_default(key, redis_k, default) - |> Command.exec!(["INCRBY", redis_k, incr], key) + |> maybe_incr_default(key, redis_k, default, opts) + |> Command.exec!(["INCRBY", redis_k, incr], key, opts) end - defp do_update_counter(adapter_meta, key, incr, ttl, default) do + defp do_update_counter(adapter_meta, key, incr, ttl, default, opts) do redis_k = enc_key(adapter_meta, key) adapter_meta - |> maybe_incr_default(key, redis_k, default) - |> Command.pipeline!([["INCRBY", redis_k, incr], ["EXPIRE", redis_k, fix_ttl(ttl)]], key) + |> maybe_incr_default(key, redis_k, default, opts) + |> Command.pipeline!( + [["INCRBY", redis_k, incr], ["EXPIRE", redis_k, fix_ttl(ttl)]], + key, + opts + ) |> hd() end - defp maybe_incr_default(adapter_meta, key, redis_k, default) + defp maybe_incr_default(adapter_meta, key, redis_k, default, opts) when is_integer(default) and default > 0 do - case Command.exec!(adapter_meta, ["EXISTS", redis_k], key) do + case Command.exec!(adapter_meta, ["EXISTS", redis_k], key, opts) do 1 -> adapter_meta 0 -> - _ = Command.exec!(adapter_meta, ["INCRBY", redis_k, default], key) + _ = Command.exec!(adapter_meta, ["INCRBY", redis_k, default], key, opts) + adapter_meta end end - defp maybe_incr_default(adapter_meta, _, _, _), do: adapter_meta + defp maybe_incr_default(adapter_meta, _, _, _, _) do + adapter_meta + end ## Nebulex.Adapter.Queryable @impl true - defspan execute(adapter_meta, operation, query, _opts) do - do_execute(adapter_meta, operation, query) + defspan execute(adapter_meta, operation, query, opts) do + do_execute(adapter_meta, operation, query, opts) end - defp do_execute(%{mode: mode} = adapter_meta, :count_all, nil) do - exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)]) + defp do_execute(%{mode: mode} = adapter_meta, :count_all, nil, opts) do + exec!(mode, [adapter_meta, ["DBSIZE"], opts], [0, &Kernel.+(&2, &1)]) end - defp do_execute(%{mode: mode} = adapter_meta, :delete_all, nil) do - size = exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)]) - _ = exec!(mode, [adapter_meta, ["FLUSHDB"]], []) + defp do_execute(%{mode: mode} = adapter_meta, :delete_all, nil, opts) do + size = exec!(mode, [adapter_meta, ["DBSIZE"], opts], [0, &Kernel.+(&2, &1)]) + _ = exec!(mode, [adapter_meta, ["FLUSHDB"], opts], []) size end - defp do_execute(%{mode: :standalone} = adapter_meta, :delete_all, {:in, keys}) + defp do_execute(%{mode: :standalone} = adapter_meta, :delete_all, {:in, keys}, opts) when is_list(keys) do - _ = Command.exec!(adapter_meta, ["DEL" | Enum.map(keys, &enc_key(adapter_meta, &1))]) + _ = Command.exec!(adapter_meta, ["DEL" | Enum.map(keys, &enc_key(adapter_meta, &1))], opts) length(keys) end - defp do_execute(adapter_meta, :delete_all, {:in, keys}) + defp do_execute(adapter_meta, :delete_all, {:in, keys}, opts) when is_list(keys) do :ok = keys @@ -736,22 +769,23 @@ defmodule NebulexRedisAdapter do Command.exec!( adapter_meta, ["DEL" | Enum.map(keys_group, &enc_key(adapter_meta, &1))], - hash_slot + hash_slot, + opts ) end) length(keys) end - defp do_execute(adapter_meta, :all, query) do - execute_query(query, adapter_meta) + defp do_execute(adapter_meta, :all, query, opts) do + execute_query(query, adapter_meta, opts) end @impl true - defspan stream(adapter_meta, query, _opts) do + defspan stream(adapter_meta, query, opts) do Stream.resource( fn -> - execute_query(query, adapter_meta) + execute_query(query, adapter_meta, opts) end, fn [] -> {:halt, []} @@ -773,10 +807,11 @@ defmodule NebulexRedisAdapter do defp with_pipeline( %{serializer: serializer, decode_value_opts: dec_val_opts} = adapter_meta, key, - pipeline + pipeline, + opts ) do adapter_meta - |> Command.pipeline!(pipeline, key) + |> Command.pipeline!(pipeline, key, opts) |> hd() |> serializer.decode_value(dec_val_opts) end @@ -797,17 +832,17 @@ defmodule NebulexRedisAdapter do "expected ttl: to be an integer >= 1000 or :infinity, got: #{inspect(ttl)}" end - defp execute_query(nil, %{serializer: serializer} = adapter_meta) do + defp execute_query(nil, %{serializer: serializer} = adapter_meta, opts) do "*" - |> execute_query(adapter_meta) + |> execute_query(adapter_meta, opts) |> Enum.map(&serializer.decode_key/1) end - defp execute_query(pattern, %{mode: mode} = adapter_meta) when is_binary(pattern) do - exec!(mode, [adapter_meta, ["KEYS", pattern]], [[], &Kernel.++(&1, &2)]) + defp execute_query(pattern, %{mode: mode} = adapter_meta, opts) when is_binary(pattern) do + exec!(mode, [adapter_meta, ["KEYS", pattern], opts], [[], &Kernel.++(&1, &2)]) end - defp execute_query(pattern, _adapter_meta) do + defp execute_query(pattern, _adapter_meta, _opts) do raise Nebulex.QueryError, message: "invalid pattern", query: pattern end diff --git a/lib/nebulex_redis_adapter/bootstrap_server.ex b/lib/nebulex_redis_adapter/bootstrap_server.ex index 345064f..f5ee7dd 100644 --- a/lib/nebulex_redis_adapter/bootstrap_server.ex +++ b/lib/nebulex_redis_adapter/bootstrap_server.ex @@ -10,6 +10,7 @@ defmodule NebulexRedisAdapter.BootstrapServer do alias Nebulex.Telemetry alias Nebulex.Telemetry.StatsHandler + alias NebulexRedisAdapter.RedisCluster ## API @@ -38,7 +39,11 @@ defmodule NebulexRedisAdapter.BootstrapServer do @impl true def terminate(_reason, adapter_meta) do - if ref = adapter_meta.stats_counter, do: Telemetry.detach(ref) + _ = if ref = adapter_meta.stats_counter, do: Telemetry.detach(ref) + + if adapter_meta.mode == :redis_cluster do + RedisCluster.del_status_key(adapter_meta.name) + end end ## Private Functions diff --git a/lib/nebulex_redis_adapter/client_cluster.ex b/lib/nebulex_redis_adapter/client_cluster.ex index 2b97c47..6d5e24b 100644 --- a/lib/nebulex_redis_adapter/client_cluster.ex +++ b/lib/nebulex_redis_adapter/client_cluster.ex @@ -2,6 +2,8 @@ defmodule NebulexRedisAdapter.ClientCluster do # Client-side Cluster @moduledoc false + import NebulexRedisAdapter.Helpers + alias NebulexRedisAdapter.ClientCluster.Keyslot, as: ClientClusterKeyslot alias NebulexRedisAdapter.ClientCluster.Supervisor, as: ClientClusterSupervisor alias NebulexRedisAdapter.Pool @@ -47,19 +49,21 @@ defmodule NebulexRedisAdapter.ClientCluster do @spec exec!( Nebulex.Adapter.adapter_meta(), Redix.command(), + Keyword.t(), init_acc :: any, reducer :: (any, any -> any) ) :: any | no_return def exec!( %{name: name, registry: registry, nodes: nodes}, command, + opts, init_acc \\ nil, reducer \\ fn res, _ -> res end ) do Enum.reduce(nodes, init_acc, fn {node_name, pool_size}, acc -> registry |> Pool.get_conn({name, node_name}, pool_size) - |> Redix.command!(command) + |> Redix.command!(command, redis_command_opts(opts)) |> reducer.(acc) end) end diff --git a/lib/nebulex_redis_adapter/command.ex b/lib/nebulex_redis_adapter/command.ex index 7f9db3f..5c6f973 100644 --- a/lib/nebulex_redis_adapter/command.ex +++ b/lib/nebulex_redis_adapter/command.ex @@ -2,7 +2,14 @@ defmodule NebulexRedisAdapter.Command do # Redix command executor @moduledoc false - alias NebulexRedisAdapter.{ClientCluster, Pool, RedisCluster} + import NebulexRedisAdapter.Helpers + + alias NebulexRedisAdapter.{ + ClientCluster, + Pool, + RedisCluster, + RedisCluster.ConfigManager + } ## API @@ -12,12 +19,13 @@ defmodule NebulexRedisAdapter.Command do @spec exec( Nebulex.Adapter.adapter_meta(), Redix.command(), - Nebulex.Cache.key() + Nebulex.Cache.key(), + Keyword.t() ) :: {:ok, term} | {:error, term} - def exec(adapter_meta, command, key \\ nil) do + def exec(adapter_meta, command, key \\ nil, opts \\ []) do adapter_meta - |> conn(key) - |> Redix.command(command) + |> conn(key, opts) + |> Redix.command(command, redis_command_opts(opts)) end @doc """ @@ -26,13 +34,32 @@ defmodule NebulexRedisAdapter.Command do @spec exec!( Nebulex.Adapter.adapter_meta(), Redix.command(), - Nebulex.Cache.key() + Nebulex.Cache.key(), + Keyword.t() ) :: term - def exec!(adapter_meta, command, key \\ nil) do + def exec!(adapter_meta, command, key \\ nil, opts \\ []) + + def exec!(%{mode: :redis_cluster, name: name} = adapter_meta, command, key, opts) do + on_moved = fn -> + # Re-configure the cluster + :ok = ConfigManager.setup_shards(name) + + # Retry once more + do_exec!(adapter_meta, command, key, opts, nil) + end + + do_exec!(adapter_meta, command, key, opts, on_moved) + end + + def exec!(adapter_meta, command, key, opts) do + do_exec!(adapter_meta, command, key, opts, nil) + end + + defp do_exec!(adapter_meta, command, key, opts, on_moved) do adapter_meta - |> conn(key) - |> Redix.command(command) - |> handle_command_response() + |> conn(key, opts) + |> Redix.command(command, redis_command_opts(opts)) + |> handle_command_response(on_moved) end @doc """ @@ -41,12 +68,13 @@ defmodule NebulexRedisAdapter.Command do @spec pipeline( Nebulex.Adapter.adapter_meta(), [Redix.command()], - Nebulex.Cache.key() + Nebulex.Cache.key(), + Keyword.t() ) :: {:ok, [term]} | {:error, term} - def pipeline(adapter_meta, commands, key \\ nil) do + def pipeline(adapter_meta, commands, key \\ nil, opts \\ []) do adapter_meta - |> conn(key) - |> Redix.pipeline(commands) + |> conn(key, opts) + |> Redix.pipeline(commands, redis_command_opts(opts)) end @doc """ @@ -55,29 +83,59 @@ defmodule NebulexRedisAdapter.Command do @spec pipeline!( Nebulex.Adapter.adapter_meta(), [Redix.command()], - Nebulex.Cache.key() + Nebulex.Cache.key(), + Keyword.t() ) :: [term] - def pipeline!(adapter_meta, commands, key \\ nil) do + def pipeline!(adapter_meta, commands, key \\ nil, opts \\ []) + + def pipeline!(%{mode: :redis_cluster, name: name} = adapter_meta, commands, key, opts) do + on_moved = fn -> + # Re-configure the cluster + :ok = ConfigManager.setup_shards(name) + + # Retry once more + do_pipeline!(adapter_meta, commands, key, opts, nil) + end + + do_pipeline!(adapter_meta, commands, key, opts, on_moved) + end + + def pipeline!(adapter_meta, commands, key, opts) do + do_pipeline!(adapter_meta, commands, key, opts, nil) + end + + defp do_pipeline!(adapter_meta, commands, key, opts, on_moved) do adapter_meta - |> conn(key) - |> Redix.pipeline(commands) - |> handle_command_response() - |> check_pipeline_errors() + |> conn(key, opts) + |> Redix.pipeline(commands, redis_command_opts(opts)) + |> handle_command_response(on_moved) + |> check_pipeline_errors(on_moved) end ## Private Functions - defp conn(%{mode: :standalone, name: name, registry: registry, pool_size: pool_size}, _key) do + defp conn( + %{mode: :standalone, name: name, registry: registry, pool_size: pool_size}, + _key, + _opts + ) do Pool.get_conn(registry, name, pool_size) end - defp conn(%{mode: :redis_cluster} = meta, key) do - RedisCluster.get_conn(meta, key) + defp conn(%{mode: :redis_cluster, name: name} = meta, key, opts) do + with nil <- RedisCluster.get_conn(meta, key, opts) do + # Perhars the cluster has to be re-configured again + :ok = ConfigManager.setup_shards(name) + + # Retry once more + RedisCluster.get_conn(meta, key, opts) + end end defp conn( %{mode: :client_side_cluster, name: name, registry: registry, nodes: nodes}, - {:"$hash_slot", node_name} + {:"$hash_slot", node_name}, + _opts ) do ClientCluster.get_conn(registry, name, nodes, node_name) end @@ -90,23 +148,32 @@ defmodule NebulexRedisAdapter.Command do nodes: nodes, keyslot: keyslot }, - key + key, + _opts ) do ClientCluster.get_conn(registry, name, nodes, key, keyslot) end - defp handle_command_response({:ok, response}) do + defp handle_command_response({:ok, response}, _on_moved) do response end - defp handle_command_response({:error, reason}) do + defp handle_command_response({:error, %Redix.Error{message: "MOVED" <> _}}, on_moved) + when is_function(on_moved) do + on_moved.() + end + + defp handle_command_response({:error, reason}, _on_moved) do raise reason end - defp check_pipeline_errors(results) do + defp check_pipeline_errors(results, on_moved) do Enum.map(results, fn - %Redix.Error{} = error -> raise error - result -> result + %Redix.Error{} = error -> + handle_command_response({:error, error}, on_moved) + + result -> + result end) end end diff --git a/lib/nebulex_redis_adapter/exceptions.ex b/lib/nebulex_redis_adapter/exceptions.ex new file mode 100644 index 0000000..580fbbe --- /dev/null +++ b/lib/nebulex_redis_adapter/exceptions.ex @@ -0,0 +1,45 @@ +defmodule NebulexRedisAdapter.Error do + @moduledoc """ + NebulexRedisAdapter error. + """ + + @typedoc "Error reason type" + @type reason :: :atom | {:atom, term} + + @typedoc "Error type" + @type t :: %__MODULE__{reason: reason, cache: atom} + + # Exception struct + defexception reason: nil, cache: nil + + ## API + + @doc false + def exception(opts) do + reason = Keyword.fetch!(opts, :reason) + cache = Keyword.fetch!(opts, :cache) + + %__MODULE__{reason: reason, cache: cache} + end + + @doc false + def message(%__MODULE__{reason: reason, cache: cache}) do + format_error(reason, cache) + end + + ## Helpers + + def format_error(:redis_cluster_status_error, cache) do + "Could not run the command because Redis Cluster is in error status " <> + "for cache #{inspect(cache)}." + end + + def format_error({:redis_cluster_setup_error, reason}, cache) when is_exception(reason) do + "Could not setup Redis Cluster for cache #{inspect(cache)}. " <> Exception.message(reason) + end + + def format_error({:redis_cluster_setup_error, reason}, cache) do + "Could not setup Redis Cluster for cache #{inspect(cache)}. " <> + "Failed with error #{inspect(reason)}." + end +end diff --git a/lib/nebulex_redis_adapter/helpers.ex b/lib/nebulex_redis_adapter/helpers.ex new file mode 100644 index 0000000..8927b9e --- /dev/null +++ b/lib/nebulex_redis_adapter/helpers.ex @@ -0,0 +1,47 @@ +defmodule NebulexRedisAdapter.Helpers do + @moduledoc false + + import Bitwise, only: [<<<: 2] + + # Inline common instructions + @compile {:inline, redis_command_opts: 1} + + ## API + + @spec redis_command_opts(Keyword.t()) :: Keyword.t() + def redis_command_opts(opts) do + Keyword.take(opts, [:timeout, :telemetry_metadata]) + end + + @spec random_sleep(pos_integer) :: :ok + def random_sleep(times) do + t = random_timeout(times) + + receive do + after + t -> :ok + end + end + + @spec random_timeout(pos_integer) :: pos_integer + def random_timeout(times) do + _ = if rem(times, 10) == 0, do: :rand.seed(:exsplus) + + # First time 1/4 seconds, then doubling each time up to 8 seconds max + tmax = + if times > 5 do + 8000 + else + div((1 <<< times) * 1000, 8) + end + + :rand.uniform(tmax) + end + + @doc false + defmacro wrap_error(exception, opts) do + quote do + {:error, unquote(exception).exception(unquote(opts))} + end + end +end diff --git a/lib/nebulex_redis_adapter/redis_cluster.ex b/lib/nebulex_redis_adapter/redis_cluster.ex index 95227ff..f662894 100644 --- a/lib/nebulex_redis_adapter/redis_cluster.ex +++ b/lib/nebulex_redis_adapter/redis_cluster.ex @@ -2,11 +2,13 @@ defmodule NebulexRedisAdapter.RedisCluster do # Redis Cluster Manager @moduledoc false - alias NebulexRedisAdapter.{Connection, Pool} + import NebulexRedisAdapter.Helpers + + alias NebulexRedisAdapter.Pool alias NebulexRedisAdapter.RedisCluster.Keyslot, as: RedisClusterKeyslot @typedoc "Proxy type to the adapter meta" - @type adapter_meta :: Nebulex.Adapter.metadata() + @type adapter_meta :: Nebulex.Adapter.adapter_meta() # Redis cluster hash slots size @redis_cluster_hash_slots 16_384 @@ -14,87 +16,86 @@ defmodule NebulexRedisAdapter.RedisCluster do ## API @spec init(adapter_meta, Keyword.t()) :: {Supervisor.child_spec(), adapter_meta} - def init(%{name: name, registry: registry, pool_size: pool_size} = adapter_meta, opts) do - # Get the hash slots topology - cluster_shards = get_cluster_shards(opts) - - # Init ETS table to store hash slots - cluster_shards_tab = init_cluster_shards_table(name) - - # Build specs for the cluster - cluster_slot_specs = - for {start, stop, master_host, master_port} <- cluster_shards do - # Define slot id - slot_id = {:cluster_shards, start, stop} - - # Store mapping between cluster slot and supervisor name - true = :ets.insert(cluster_shards_tab, slot_id) - - # Define options - opts = - Keyword.merge(opts, - slot_id: slot_id, - registry: registry, - pool_size: pool_size, - master_host: master_host, - master_port: master_port - ) - - # Define child spec - Supervisor.child_spec( - {NebulexRedisAdapter.RedisCluster.Supervisor, opts}, - type: :supervisor, - id: slot_id - ) - end - - cluster_shards_supervisor_spec = %{ - id: :cluster_shards_supervisor, - type: :supervisor, - start: {Supervisor, :start_link, [cluster_slot_specs, [strategy: :one_for_one]]} - } + def init(%{name: name} = adapter_meta, opts) do + # Init ETS table to store the hash slot map + cluster_shards_tab = init_hash_slot_map_table(name) adapter_meta = adapter_meta |> Map.put(:cluster_shards_tab, cluster_shards_tab) |> Map.update(:keyslot, RedisClusterKeyslot, &(&1 || RedisClusterKeyslot)) + children = [ + {NebulexRedisAdapter.RedisCluster.DynamicSupervisor, {adapter_meta, opts}}, + {NebulexRedisAdapter.RedisCluster.ConfigManager, {adapter_meta, opts}} + ] + + cluster_shards_supervisor_spec = %{ + id: {name, RedisClusterSupervisor}, + type: :supervisor, + start: {Supervisor, :start_link, [children, [strategy: :rest_for_one]]} + } + {cluster_shards_supervisor_spec, adapter_meta} end - @spec exec!(adapter_meta, Redix.command(), init_acc :: any, (any, any -> any)) :: any + @spec exec!(adapter_meta, Redix.command(), Keyword.t(), init_acc :: any, (any, any -> any)) :: + any def exec!( - %{registry: registry, pool_size: pool_size} = adapter_meta, + %{ + name: name, + cluster_shards_tab: cluster_shards_tab, + registry: registry, + pool_size: pool_size + }, command, + opts, init_acc \\ nil, reducer \\ fn res, _ -> res end ) do - adapter_meta.cluster_shards_tab - |> :ets.lookup(:cluster_shards) - |> Enum.reduce(init_acc, fn slot_id, acc -> - registry - |> Pool.get_conn(slot_id, pool_size) - |> Redix.command!(command) - |> reducer.(acc) + with_retry(name, Keyword.get(opts, :lock_retries, :infinity), fn -> + cluster_shards_tab + |> :ets.lookup(:cluster_shards) + |> Enum.reduce(init_acc, fn slot_id, acc -> + registry + |> Pool.get_conn(slot_id, pool_size) + |> Redix.command!(command, redis_command_opts(opts)) + |> reducer.(acc) + end) end) end - @spec get_conn(adapter_meta, {:"$hash_slot", any} | any) :: pid | nil - def get_conn(%{registry: registry, pool_size: pool_size} = adapter_meta, key) do - {:"$hash_slot", hash_slot} = - case key do - {:"$hash_slot", _} -> key - _ -> hash_slot(key, adapter_meta.keyslot) - end - - adapter_meta.cluster_shards_tab - |> :ets.lookup(:cluster_shards) - |> Enum.reduce_while(nil, fn - {_, start, stop} = slot_id, _acc when hash_slot >= start and hash_slot <= stop -> - {:halt, Pool.get_conn(registry, slot_id, pool_size)} - - _, acc -> - {:cont, acc} + @spec get_conn(adapter_meta, {:"$hash_slot", any} | any, Keyword.t()) :: pid | nil + def get_conn( + %{ + name: name, + keyslot: keyslot, + cluster_shards_tab: cluster_shards_tab, + registry: registry, + pool_size: pool_size + }, + key, + opts + ) do + with_retry(name, Keyword.get(opts, :lock_retries, :infinity), fn -> + {:"$hash_slot", hash_slot} = + case key do + {:"$hash_slot", _} -> + key + + _ -> + hash_slot(key, keyslot) + end + + cluster_shards_tab + |> :ets.lookup(:cluster_shards) + |> Enum.reduce_while(nil, fn + {_, start, stop} = slot_id, _acc when hash_slot >= start and hash_slot <= stop -> + {:halt, Pool.get_conn(registry, slot_id, pool_size)} + + _, acc -> + {:cont, acc} + end) end) end @@ -103,10 +104,12 @@ defmodule NebulexRedisAdapter.RedisCluster do Enum.reduce(enum, %{}, fn {key, _} = entry, acc -> slot = hash_slot(key, keyslot) + Map.put(acc, slot, [entry | Map.get(acc, slot, [])]) key, acc -> slot = hash_slot(key, keyslot) + Map.put(acc, slot, [key | Map.get(acc, slot, [])]) end) end @@ -116,128 +119,69 @@ defmodule NebulexRedisAdapter.RedisCluster do {:"$hash_slot", keyslot.hash_slot(key, @redis_cluster_hash_slots)} end - ## Private Functions - - defp init_cluster_shards_table(name) do - :ets.new(name, [ - :public, - :duplicate_bag, - read_concurrency: true - ]) - end - - defp get_cluster_shards(opts) do - with {:ok, conn, config_endpoint} <- opts |> Connection.conn_opts() |> connect(), - {:ok, cluster_info} <- cluster_info(conn), - command = cluster_command(cluster_info["redis_version"]), - {:ok, cluster_info} <- Redix.command(conn, command) do - parse_cluster_info(cluster_info, config_endpoint, opts) - else - {:error, reason} -> exit(reason) - end + @spec get_status(atom, atom) :: atom + def get_status(name, default \\ :locked) when is_atom(name) and is_atom(default) do + name + |> status_key() + |> :persistent_term.get(default) end - defp connect(conn_opts) do - case Keyword.pop(conn_opts, :url) do - {nil, conn_opts} -> - with {:ok, conn} <- Redix.start_link(conn_opts) do - {:ok, conn, conn_opts[:host]} - end - - {url, conn_opts} -> - with {:ok, conn} <- Redix.start_link(url, conn_opts) do - {:ok, conn, URI.parse(url).host} - end - end + @spec put_status(atom, atom) :: :ok + def put_status(name, status) when is_atom(name) and is_atom(status) do + # An atom is a single word so this does not trigger a global GC + name + |> status_key() + |> :persistent_term.put(status) end - defp cluster_info(conn) do - with {:ok, raw_info} <- Redix.command(conn, ["INFO", "server"]) do - cluster_info = - raw_info - |> String.split(["\r\n", "\n"], trim: true) - |> Enum.reduce(%{}, fn str, acc -> - case String.split(str, ":", trim: true) do - [key, value] -> Map.put(acc, key, value) - _other -> acc - end - end) - - {:ok, cluster_info} - end + @spec del_status_key(atom) :: boolean + def del_status_key(name) when is_atom(name) do + # An atom is a single word so this does not trigger a global GC + name + |> status_key() + |> :persistent_term.erase() end - defp cluster_command(<>) do - case Integer.parse(major) do - {v, _} when v >= 7 -> - ["CLUSTER", "SHARDS"] - - _else -> - ["CLUSTER", "SLOTS"] - end + @spec with_retry(atom, pos_integer, (() -> term)) :: term + def with_retry(name, retries, fun) do + with_retry(name, fun, retries, 1) end # coveralls-ignore-start - defp cluster_command(_) do - ["CLUSTER", "SLOTS"] + defp with_retry(_name, fun, max_retries, retries) when retries >= max_retries do + fun.() end # coveralls-ignore-stop - defp parse_cluster_info(config, config_endpoint, opts) do - # Whether the given master host should be overridden with the - # configuration endpoint or not - override? = Keyword.get(opts, :override_master_host, false) - - Enum.reduce(config, [], fn - # Redis version >= 7 (["CLUSTER", "SHARDS"]) - ["slots", [start, stop], "nodes", nodes], acc -> - {host, port} = - for [ - "id", - _, - "port", - port, - "ip", - ip, - "endpoint", - endpoint, - "role", - "master", - "replication-offset", - _, - "health", - "online" - ] <- nodes do - {endpoint || ip, port} - end - |> hd() - - [{start, stop, host, port} | acc] - - # Redis version < 7 (["CLUSTER", "SLOTS"]) - [start, stop, [host, port | _tail] = _master | _replicas], acc -> - [{start, stop, host, port} | acc] - end) - |> Enum.map(fn {start, stop, host, port} -> - {start, stop, maybe_override_host(host, config_endpoint, override?), port} - end) - end + defp with_retry(name, fun, max_retries, retries) do + case get_status(name) do + :ok -> + fun.() - defp maybe_override_host(_host, config_endpoint, true) do - config_endpoint - end + :locked -> + :ok = random_sleep(retries) - # coveralls-ignore-start + with_retry(name, fun, max_retries, retries + 1) - defp maybe_override_host(nil, config_endpoint, _override?) do - config_endpoint + :error -> + raise NebulexRedisAdapter.Error, reason: :redis_cluster_status_error, cache: name + end end - defp maybe_override_host(host, _config_endpoint, _override?) do - host - end + ## Private Functions - # coveralls-ignore-stop + # Inline common instructions + @compile {:inline, status_key: 1} + + defp status_key(name), do: {name, :redis_cluster_status} + + defp init_hash_slot_map_table(name) do + :ets.new(name, [ + :public, + :duplicate_bag, + read_concurrency: true + ]) + end end diff --git a/lib/nebulex_redis_adapter/redis_cluster/config_manager.ex b/lib/nebulex_redis_adapter/redis_cluster/config_manager.ex new file mode 100644 index 0000000..caa6830 --- /dev/null +++ b/lib/nebulex_redis_adapter/redis_cluster/config_manager.ex @@ -0,0 +1,300 @@ +defmodule NebulexRedisAdapter.RedisCluster.ConfigManager do + @moduledoc false + + use GenServer + + import Nebulex.Helpers, only: [normalize_module_name: 1] + import NebulexRedisAdapter.Helpers + + alias Nebulex.Telemetry + alias NebulexRedisAdapter.{Connection, RedisCluster} + alias NebulexRedisAdapter.RedisCluster.PoolSupervisor + + ## Internals + + # GenServer State + defstruct adapter_meta: nil, + opts: [], + running_shards: [], + dynamic_sup: nil, + setup_retries: 1 + + ## API + + @spec start_link({Nebulex.Adapter.adapter_meta(), keyword}) :: GenServer.on_start() + def start_link({adapter_meta, opts}) do + name = normalize_module_name([adapter_meta.name, ConfigManager]) + + GenServer.start(__MODULE__, {adapter_meta, opts}, name: name) + end + + @spec setup_shards(name :: atom) :: :ok + def setup_shards(name) do + [name, ConfigManager] + |> normalize_module_name() + |> GenServer.call(:setup_shards) + end + + ## GenServer callbacks + + @impl true + def init({adapter_meta, opts}) do + state = %__MODULE__{ + adapter_meta: adapter_meta, + opts: opts, + dynamic_sup: normalize_module_name([adapter_meta.name, DynamicSupervisor]) + } + + {:ok, state, {:continue, :setup_shards}} + end + + @impl true + def handle_continue(:setup_shards, state) do + do_setup_shards(state) + end + + @impl true + def handle_call(:setup_shards, from, state) do + do_setup_shards(state, fn -> GenServer.reply(from, :ok) end) + end + + @impl true + def handle_info(message, state) + + def handle_info(:timeout, state) do + do_setup_shards(state) + end + + # coveralls-ignore-start + + def handle_info({:DOWN, ref, _type, pid, _reason}, %__MODULE__{running_shards: shards} = state) do + if Enum.member?(shards, {pid, ref}) do + do_setup_shards(state) + else + {:noreply, state} + end + end + + # coveralls-ignore-stop + + @impl true + def terminate(_reason, %__MODULE__{adapter_meta: meta, dynamic_sup: sup, running_shards: lst}) do + # Set cluster status to error + :ok = RedisCluster.put_status(meta.name, :error) + + # Stop running shards/pools (cleanup) + :ok = stop_running_shards(meta.cluster_shards_tab, sup, lst) + end + + ## Private functions + + defp do_setup_shards( + %__MODULE__{adapter_meta: %{name: name}, setup_retries: n} = state, + on_locked \\ fn -> :noop end + ) do + # Lock the cluster + :ok = RedisCluster.put_status(name, :locked) + + # Invoke the on_locked callback + _ = on_locked.() + + # Configure the cluster shards/pools + case configure_shards(state) do + {:ok, running_shards} -> + # Unlock the cluster (status set to ok) + :ok = RedisCluster.put_status(name, :ok) + + {:noreply, %{state | running_shards: running_shards, setup_retries: 1}} + + {:error, _reason} -> + # Set cluster status to error + :ok = RedisCluster.put_status(name, :error) + + {:noreply, %{state | running_shards: [], setup_retries: n + 1}, random_timeout(n)} + end + end + + defp configure_shards(%__MODULE__{ + adapter_meta: adapter_meta, + opts: opts, + dynamic_sup: dynamic_sup, + running_shards: running_shards + }) do + metadata = %{ + adapter_meta: adapter_meta, + pid: self(), + status: nil, + reason: nil + } + + Telemetry.span(adapter_meta.telemetry_prefix ++ [:config_manager, :setup], metadata, fn -> + case configure_shards(adapter_meta, dynamic_sup, running_shards, opts) do + {:ok, _} = ok -> + {ok, %{metadata | status: :ok, reason: :succeeded}} + + {:error, reason} -> + # Wrap up the error + error = + wrap_error NebulexRedisAdapter.Error, + reason: {:redis_cluster_setup_error, reason}, + cache: adapter_meta.name + + {error, %{metadata | status: :error, reason: reason}} + end + end) + end + + defp configure_shards( + %{ + cluster_shards_tab: cluster_shards_tab, + registry: registry, + pool_size: pool_size + }, + dynamic_sup, + running_shards, + opts + ) do + # Stop running shards/pools (cleanup) + :ok = stop_running_shards(cluster_shards_tab, dynamic_sup, running_shards) + + # Setup the cluster + with {:ok, specs} <- get_cluster_shards(opts) do + running_shards = + for {start, stop, m_host, m_port} <- specs do + # Define slot id + slot_id = {:cluster_shards, start, stop} + + # Define options + opts = + Keyword.merge(opts, + slot_id: slot_id, + registry: registry, + pool_size: pool_size, + master_host: m_host, + master_port: m_port + ) + + # Define child spec + shard_spec = + Supervisor.child_spec( + {PoolSupervisor, opts}, + type: :supervisor, + id: {PoolSupervisor, slot_id}, + restart: :temporary + ) + + # Start the child for the given shard/pool + {:ok, pid} = DynamicSupervisor.start_child(dynamic_sup, shard_spec) + + # Monitor the process + ref = Process.monitor(pid) + + # Update hash slot map with the started one + true = :ets.insert(cluster_shards_tab, slot_id) + + # Return the child pid with its monitor reference. + {pid, ref} + end + + # Return running shards/pools + {:ok, running_shards} + end + end + + defp stop_running_shards(cluster_shards_tab, dynamic_sup, running_shards) do + # Flush the hash slot map + true = :ets.delete(cluster_shards_tab, :cluster_shards) + + # Stop the running shards/pools + Enum.each(running_shards, &DynamicSupervisor.terminate_child(dynamic_sup, elem(&1, 0))) + end + + defp get_cluster_shards(opts) do + with {:ok, conn, config_endpoint} <- Connection.conn_opts(opts) |> connect(), + {:ok, cluster_info} <- cluster_info(conn) do + {:ok, parse_cluster_info(cluster_info, config_endpoint, opts)} + end + end + + defp connect(conn_opts) do + case Keyword.pop(conn_opts, :url) do + {nil, conn_opts} -> + with {:ok, conn} <- Redix.start_link(conn_opts) do + {:ok, conn, conn_opts[:host]} + end + + {url, conn_opts} -> + with {:ok, conn} <- Redix.start_link(url, conn_opts) do + {:ok, conn, URI.parse(url).host} + end + end + end + + defp cluster_info(conn) do + with {:error, %Redix.Error{}} <- Redix.command(conn, ["CLUSTER", "SHARDS"]) do + Redix.command(conn, ["CLUSTER", "SLOTS"]) + end + end + + defp parse_cluster_info(config, config_endpoint, opts) do + # Whether the given master host should be overridden with the + # configuration endpoint or not + override? = Keyword.get(opts, :override_master_host, false) + + Enum.reduce(config, [], fn + # Redis version >= 7 (["CLUSTER", "SHARDS"]) + ["slots", [start, stop], "nodes", nodes], acc -> + for [ + "id", + _, + "port", + port, + "ip", + _, + "endpoint", + endpoint, + "role", + "master", + "replication-offset", + _, + "health", + "online" + ] <- nodes do + {endpoint, port} + end + |> case do + [] -> + # coveralls-ignore-start + acc + + # coveralls-ignore-stop + + [{host, port} | _] -> + [{start, stop, host, port} | acc] + end + + # Redis version < 7 (["CLUSTER", "SLOTS"]) + [start, stop, [host, port | _tail] = _master | _replicas], acc -> + [{start, stop, host, port} | acc] + end) + |> Enum.map(fn {start, stop, host, port} -> + {start, stop, maybe_override_host(host, config_endpoint, override?), port} + end) + end + + defp maybe_override_host(_host, config_endpoint, true) do + config_endpoint + end + + # coveralls-ignore-start + + defp maybe_override_host(nil, config_endpoint, _override?) do + config_endpoint + end + + defp maybe_override_host(host, _config_endpoint, _override?) do + host + end + + # coveralls-ignore-stop +end diff --git a/lib/nebulex_redis_adapter/redis_cluster/dynamic_supervisor.ex b/lib/nebulex_redis_adapter/redis_cluster/dynamic_supervisor.ex new file mode 100644 index 0000000..377eb04 --- /dev/null +++ b/lib/nebulex_redis_adapter/redis_cluster/dynamic_supervisor.ex @@ -0,0 +1,25 @@ +defmodule NebulexRedisAdapter.RedisCluster.DynamicSupervisor do + @moduledoc false + + use DynamicSupervisor + + import Nebulex.Helpers, only: [normalize_module_name: 1] + + ## API + + @spec start_link({Nebulex.Adapter.adapter_meta(), keyword}) :: Supervisor.on_start() + def start_link({adapter_meta, opts}) do + name = normalize_module_name([adapter_meta.name, DynamicSupervisor]) + + DynamicSupervisor.start_link(__MODULE__, opts, name: name) + end + + ## Callbacks + + @impl true + def init(opts) do + max_children = Keyword.get(opts, :max_children, :infinity) + + DynamicSupervisor.init(strategy: :one_for_one, max_children: max_children) + end +end diff --git a/lib/nebulex_redis_adapter/redis_cluster/supervisor.ex b/lib/nebulex_redis_adapter/redis_cluster/pool_supervisor.ex similarity index 94% rename from lib/nebulex_redis_adapter/redis_cluster/supervisor.ex rename to lib/nebulex_redis_adapter/redis_cluster/pool_supervisor.ex index 6988bef..f67128e 100644 --- a/lib/nebulex_redis_adapter/redis_cluster/supervisor.ex +++ b/lib/nebulex_redis_adapter/redis_cluster/pool_supervisor.ex @@ -1,4 +1,4 @@ -defmodule NebulexRedisAdapter.RedisCluster.Supervisor do +defmodule NebulexRedisAdapter.RedisCluster.PoolSupervisor do @moduledoc """ Redis Cluster Node/Slot Supervisor. """ diff --git a/mix.exs b/mix.exs index 9f1793e..afb15ea 100644 --- a/mix.exs +++ b/mix.exs @@ -56,7 +56,7 @@ defmodule NebulexRedisAdapter.MixProject do {:telemetry, "~> 0.4 or ~> 1.0", optional: true}, # Test & Code Analysis - {:excoveralls, "~> 0.15", only: :test}, + {:excoveralls, "~> 0.16", only: :test}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.2", only: [:dev, :test], runtime: false}, {:mimic, "~> 1.7", only: :test}, diff --git a/test/docker/cluster/redis-node-0.conf b/test/docker/cluster/redis-node-0.conf index 8fcf631..d52154a 100644 --- a/test/docker/cluster/redis-node-0.conf +++ b/test/docker/cluster/redis-node-0.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/docker/cluster/redis-node-1.conf b/test/docker/cluster/redis-node-1.conf index 98fbc92..4f5e294 100644 --- a/test/docker/cluster/redis-node-1.conf +++ b/test/docker/cluster/redis-node-1.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/docker/cluster/redis-node-2.conf b/test/docker/cluster/redis-node-2.conf index fabfea5..c2bb7c4 100644 --- a/test/docker/cluster/redis-node-2.conf +++ b/test/docker/cluster/redis-node-2.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/docker/cluster/redis-node-3.conf b/test/docker/cluster/redis-node-3.conf index 2dff417..7934ce4 100644 --- a/test/docker/cluster/redis-node-3.conf +++ b/test/docker/cluster/redis-node-3.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/docker/cluster/redis-node-4.conf b/test/docker/cluster/redis-node-4.conf index b81f813..b5bd04f 100644 --- a/test/docker/cluster/redis-node-4.conf +++ b/test/docker/cluster/redis-node-4.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/docker/cluster/redis-node-5.conf b/test/docker/cluster/redis-node-5.conf index 2512d1e..58e85e3 100644 --- a/test/docker/cluster/redis-node-5.conf +++ b/test/docker/cluster/redis-node-5.conf @@ -3,4 +3,4 @@ requirepass password cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 -appendonly yes +appendonly no diff --git a/test/nebulex_redis_adapter/command_test.exs b/test/nebulex_redis_adapter/command_test.exs index 6474fcc..7027f80 100644 --- a/test/nebulex_redis_adapter/command_test.exs +++ b/test/nebulex_redis_adapter/command_test.exs @@ -10,12 +10,13 @@ defmodule NebulexRedisAdapter.CommandTest do |> expect(:get_conn, fn _, _, _ -> self() end) Redix - |> expect(:pipeline, fn _, _ -> {:ok, [%Redix.Error{}]} end) + |> expect(:pipeline, fn _, _, _ -> {:ok, [%Redix.Error{}]} end) assert_raise Redix.Error, fn -> - Command.pipeline!(%{mode: :standalone, name: :test, registry: :test, pool_size: 1}, [ - "PING" - ]) + Command.pipeline!( + %{mode: :standalone, name: :test, registry: :test, pool_size: 1}, + [["PING"]] + ) end end end diff --git a/test/nebulex_redis_adapter/redis_cluster_test.exs b/test/nebulex_redis_adapter/redis_cluster_test.exs index 1a405d8..558811f 100644 --- a/test/nebulex_redis_adapter/redis_cluster_test.exs +++ b/test/nebulex_redis_adapter/redis_cluster_test.exs @@ -1,6 +1,9 @@ defmodule NebulexRedisAdapter.RedisClusterTest do use ExUnit.Case, async: true use NebulexRedisAdapter.CacheTest + use Mimic + + import Nebulex.CacheCase, only: [with_telemetry_handler: 3] alias NebulexRedisAdapter.RedisCluster alias NebulexRedisAdapter.TestCache.RedisCluster, as: Cache @@ -12,16 +15,74 @@ defmodule NebulexRedisAdapter.RedisClusterTest do on_exit(fn -> :ok = Process.sleep(100) + if Process.alive?(pid), do: Cache.stop(pid) end) {:ok, cache: Cache, name: Cache} end - describe "connection" do - test "error: connection is closed" do - assert {:error, %Redix.ConnectionError{reason: :closed}} = - RedisClusterConnError.start_link() + describe "cluster setup" do + setup do + start_event = telemetry_event(:redis_cluster_conn_error, :start) + stop_event = telemetry_event(:redis_cluster_conn_error, :stop) + + {:ok, events: [start_event, stop_event]} + end + + test "error: connection with config endpoint cannot be established", %{events: [_, stop]} do + with_telemetry_handler(__MODULE__, [stop], fn -> + {:ok, _pid} = RedisClusterConnError.start_link() + + # 1st failed attempt + assert_receive {^stop, %{duration: _}, %{status: :error}}, 5000 + + # 2dn failed attempt + assert_receive {^stop, %{duration: _}, %{status: :error}}, 5000 + end) + end + + test "error: redis cluster status is set to error", %{events: [start, stop] = events} do + with_telemetry_handler(__MODULE__, events, fn -> + {:ok, _} = + RedisClusterConnError.start_link( + conn_opts: [ + host: "127.0.0.1", + port: 7000 + ] + ) + + assert_receive {^start, _, %{pid: pid}}, 5000 + assert_receive {^stop, %{duration: _}, %{status: :ok}}, 5000 + + :ok = GenServer.stop(pid) + + assert_raise NebulexRedisAdapter.Error, ~r"Redis Cluster is in error status", fn -> + RedisClusterConnError.get("foo") + end + end) + end + + test "error: command failed after reconfiguring cluster", %{events: [_, stop]} do + with_telemetry_handler(__MODULE__, [stop], fn -> + {:ok, _} = + RedisClusterConnError.start_link( + conn_opts: [ + host: "127.0.0.1", + port: 7000 + ] + ) + + assert_receive {^stop, %{duration: _}, %{status: :ok}}, 5000 + + # Setup mocks + NebulexRedisAdapter.RedisCluster + |> expect(:get_conn, fn _, _, _ -> nil end) + + refute RedisClusterConnError.get("foo") + + assert_receive {^stop, %{duration: _}, %{status: :ok}}, 5000 + end) end end @@ -45,13 +106,20 @@ defmodule NebulexRedisAdapter.RedisClusterTest do end test "with put and get operations" do - assert :ok == Cache.put_all(%{"{foo}.1" => "bar1", "{foo}.2" => "bar2"}) - assert %{"{foo}.1" => "bar1", "{foo}.2" => "bar2"} == Cache.get_all(["{foo}.1", "{foo}.2"]) + assert Cache.put_all(%{"{foo}.1" => "bar1", "{foo}.2" => "bar2"}) == :ok + + assert Cache.get_all(["{foo}.1", "{foo}.2"]) == %{"{foo}.1" => "bar1", "{foo}.2" => "bar2"} end end - describe "MOVED error" do - test "when executing a Redis command/pipeline" do + describe "MOVED" do + setup do + stop_event = telemetry_event(:redis_cluster, :stop) + + {:ok, events: [stop_event]} + end + + test "error: raises an exception in the 2nd attempt after reconfiguring the cluster" do _ = Process.flag(:trap_exit, true) {:ok, _pid} = RedisClusterWithKeyslot.start_link() @@ -61,10 +129,40 @@ defmodule NebulexRedisAdapter.RedisClusterTest do RedisClusterWithKeyslot.put("1234567890", "hello") end - # get is executed throughout a Redis pipeline + # put_all is executed throughout a Redis pipeline assert_raise Redix.Error, ~r"MOVED", fn -> - RedisClusterWithKeyslot.get("1234567890", "hello") + RedisClusterWithKeyslot.put_all(foo: "bar", bar: "foo") end end + + test "ok: command is successful after configuring the cluster", %{events: [stop] = events} do + with_telemetry_handler(__MODULE__, events, fn -> + # Setup mocks + NebulexRedisAdapter.RedisCluster.Keyslot + |> expect(:hash_slot, fn _, _ -> 0 end) + + # Triggers MOVED error the first time, then the command succeeds + :ok = Cache.put("foo", "bar") + + # Cluster is re-configured + assert_receive {^stop, %{duration: _}, %{status: :ok}}, 5000 + + # Command was executed successfully + assert Cache.get("foo") == "bar" + end) + end + end + + ## Private functions + + defp telemetry_event(cache, event) do + [ + :nebulex_redis_adapter, + :test_cache, + cache, + :config_manager, + :setup, + event + ] end end diff --git a/test/test_helper.exs b/test/test_helper.exs index fdedb73..0464b22 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -28,7 +28,9 @@ end # Mocks [ Redix, - NebulexRedisAdapter.Pool + NebulexRedisAdapter.Pool, + NebulexRedisAdapter.RedisCluster, + NebulexRedisAdapter.RedisCluster.Keyslot ] |> Enum.each(&Mimic.copy/1)