Skip to content

Commit

Permalink
[#49] New Redis Cluster management strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Mar 25, 2023
1 parent 5089969 commit c9a61c8
Show file tree
Hide file tree
Showing 24 changed files with 876 additions and 286 deletions.
12 changes: 10 additions & 2 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# Used by "mix format"
locals_without_parens = [
# Helpers
wrap_error: 1,
wrap_error: 2
]

[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 100,
locals_without_parens: locals_without_parens,
export: [locals_without_parens: locals_without_parens]
]
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ erl_crash.dump
._*
.elixir*
.vs*
*.coverdata
/priv
/nebulex
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ config :my_app, MyApp.RedisClusterCache,
]
```

The pool of connections against the different master nodes is automatically
**IMPORTANT:** The option `:master_nodes` has been removed in favor of the
new cluster management strategy. Instead of `:master_nodes`, you just need
to configure the `:conn_opts` pointing to your configuration endpoint
(which could be one of the master nodes). See
[Redis Cluster options](https://hexdocs.pm/nebulex_redis_adapter/NebulexRedisAdapter.html#module-redis-cluster-options)

The pool of connections to the different master nodes is automatically
configured by the adapter once it gets the cluster slots info.

> This one could be the easiest and recommended way for distributed caching
Expand Down
2 changes: 2 additions & 0 deletions coveralls.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"skip_files": [
"lib/nebulex_redis_adapter/exceptions.ex",
"lib/nebulex_redis_adapter/helpers.ex",
"test/*"
],

Expand Down
151 changes: 93 additions & 58 deletions lib/nebulex_redis_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ defmodule NebulexRedisAdapter do
provides a simple client-side cluster implementation based on
Sharding distribution model via `:client_side_cluster` mode.
## Shared Options
## Configuration options
In addition to `Nebulex.Cache` shared options, this adapters supports the
In addition to `Nebulex.Cache` config options, the adapter supports the
following options:
* `:mode` - Defines the mode Redis will be set up. It can be one of the
Expand All @@ -37,6 +37,22 @@ defmodule NebulexRedisAdapter do
* `:serializer` - Custom serializer module implementing the
`NebulexRedisAdapter.Serializer` behaviour.
## Shared runtime options
Since the adapter runs on top of `Redix`, all commands accept their options
(e.g.: `:timeout`, and `:telemetry_metadata`). See `Redix` docs for more
information.
Additionally, the adapter support the following options for all commands too:
* `:lock_retries` - This option is specific to the `:redis_cluster` mode.
When the config manager is running and setting up the hash slot map,
all Redis commands get blocked until the cluster is properly configured
and the hash slot map is ready to use. This option defines the max retry
attempts to acquire the lock and execute the command in case the
configuration manager is running and all commands are locked..
Defaults to `:infinity`.
## Telemetry events
This adapter emits the recommended Telemetry events.
Expand Down Expand Up @@ -104,7 +120,13 @@ defmodule NebulexRedisAdapter do
password: "password"
]
### Redis Cluster Options
**IMPORTANT:** The option `:master_nodes` has been removed in favor of the
new cluster management strategy. Instead of `:master_nodes`, you just need
to configure the `:conn_opts` pointing to your configuration endpoint
(which could be one of the master nodes). See the **"Redis Cluster options"**
next.
### Redis Cluster options
In addition to shared options, `:redis_cluster` mode supports the following
options:
Expand Down Expand Up @@ -474,32 +496,33 @@ defmodule NebulexRedisAdapter do
## Nebulex.Adapter.Entry

@impl true
defspan get(adapter_meta, key, _opts) do
defspan get(adapter_meta, key, opts) do
%{
serializer: serializer,
encode_key_opts: enc_key_opts,
decode_value_opts: dec_value_opts
} = adapter_meta

adapter_meta
|> Command.exec!(["GET", serializer.encode_key(key, enc_key_opts)], key)
|> Command.exec!(["GET", serializer.encode_key(key, enc_key_opts)], key, opts)
|> serializer.decode_value(dec_value_opts)
end

@impl true
defspan get_all(adapter_meta, keys, _opts) do
do_get_all(adapter_meta, keys)
defspan get_all(adapter_meta, keys, opts) do
do_get_all(adapter_meta, keys, opts)
end

defp do_get_all(%{mode: :standalone} = adapter_meta, keys) do
mget(nil, adapter_meta, keys)
defp do_get_all(%{mode: :standalone} = adapter_meta, keys, opts) do
mget(nil, adapter_meta, keys, opts)
end

defp do_get_all(adapter_meta, keys) do
defp do_get_all(adapter_meta, keys, opts) do
keys
|> group_keys_by_hash_slot(adapter_meta)
|> Enum.reduce(%{}, fn {hash_slot, keys}, acc ->
return = mget(hash_slot, adapter_meta, keys)
return = mget(hash_slot, adapter_meta, keys, opts)

Map.merge(acc, return)
end)
end
Expand All @@ -511,12 +534,14 @@ defmodule NebulexRedisAdapter do
encode_key_opts: enc_key_opts,
decode_value_opts: dec_value_opts
} = adapter_meta,
keys
keys,
opts
) do
adapter_meta
|> Command.exec!(
["MGET" | Enum.map(keys, &serializer.encode_key(&1, enc_key_opts))],
hash_slot_key
hash_slot_key,
opts
)
|> Enum.reduce({keys, %{}}, fn
nil, {[_key | keys], acc} ->
Expand All @@ -529,7 +554,7 @@ defmodule NebulexRedisAdapter do
end

@impl true
defspan put(adapter_meta, key, value, ttl, on_write, _opts) do
defspan put(adapter_meta, key, value, ttl, on_write, opts) do
%{
serializer: serializer,
encode_key_opts: enc_key_opts,
Expand All @@ -540,25 +565,25 @@ defmodule NebulexRedisAdapter do
redis_v = serializer.encode_value(value, enc_value_opts)
cmd_opts = cmd_opts(action: on_write, ttl: fix_ttl(ttl))

case Command.exec!(adapter_meta, ["SET", redis_k, redis_v | cmd_opts], key) do
case Command.exec!(adapter_meta, ["SET", redis_k, redis_v | cmd_opts], key, opts) do
"OK" -> true
nil -> false
end
end

@impl true
defspan put_all(adapter_meta, entries, ttl, on_write, _opts) do
defspan put_all(adapter_meta, entries, ttl, on_write, opts) do
ttl = fix_ttl(ttl)

case adapter_meta.mode do
:standalone ->
do_put_all(adapter_meta, nil, entries, ttl, on_write)
do_put_all(adapter_meta, nil, entries, ttl, on_write, opts)

_ ->
entries
|> group_keys_by_hash_slot(adapter_meta)
|> Enum.reduce(:ok, fn {hash_slot, group}, acc ->
acc && do_put_all(adapter_meta, hash_slot, group, ttl, on_write)
acc && do_put_all(adapter_meta, hash_slot, group, ttl, on_write, opts)
end)
end
end
Expand All @@ -572,7 +597,8 @@ defmodule NebulexRedisAdapter do
hash_slot,
entries,
ttl,
on_write
on_write,
opts
) do
cmd =
case on_write do
Expand All @@ -593,7 +619,7 @@ defmodule NebulexRedisAdapter do
end)

adapter_meta
|> Command.pipeline!([Enum.reverse(mset) | expire], hash_slot)
|> Command.pipeline!([Enum.reverse(mset) | expire], hash_slot, opts)
|> hd()
|> case do
"OK" -> :ok
Expand All @@ -603,17 +629,17 @@ defmodule NebulexRedisAdapter do
end

@impl true
defspan delete(adapter_meta, key, _opts) do
_ = Command.exec!(adapter_meta, ["DEL", enc_key(adapter_meta, key)], key)
defspan delete(adapter_meta, key, opts) do
_ = Command.exec!(adapter_meta, ["DEL", enc_key(adapter_meta, key)], key, opts)

:ok
end

@impl true
defspan take(adapter_meta, key, _opts) do
defspan take(adapter_meta, key, opts) do
redis_k = enc_key(adapter_meta, key)

with_pipeline(adapter_meta, key, [["GET", redis_k], ["DEL", redis_k]])
with_pipeline(adapter_meta, key, [["GET", redis_k], ["DEL", redis_k]], opts)
end

@impl true
Expand Down Expand Up @@ -667,67 +693,74 @@ defmodule NebulexRedisAdapter do
end

@impl true
defspan update_counter(adapter_meta, key, incr, ttl, default, _opts) do
do_update_counter(adapter_meta, key, incr, ttl, default)
defspan update_counter(adapter_meta, key, incr, ttl, default, opts) do
do_update_counter(adapter_meta, key, incr, ttl, default, opts)
end

defp do_update_counter(adapter_meta, key, incr, :infinity, default) do
defp do_update_counter(adapter_meta, key, incr, :infinity, default, opts) do
redis_k = enc_key(adapter_meta, key)

adapter_meta
|> maybe_incr_default(key, redis_k, default)
|> Command.exec!(["INCRBY", redis_k, incr], key)
|> maybe_incr_default(key, redis_k, default, opts)
|> Command.exec!(["INCRBY", redis_k, incr], key, opts)
end

defp do_update_counter(adapter_meta, key, incr, ttl, default) do
defp do_update_counter(adapter_meta, key, incr, ttl, default, opts) do
redis_k = enc_key(adapter_meta, key)

adapter_meta
|> maybe_incr_default(key, redis_k, default)
|> Command.pipeline!([["INCRBY", redis_k, incr], ["EXPIRE", redis_k, fix_ttl(ttl)]], key)
|> maybe_incr_default(key, redis_k, default, opts)
|> Command.pipeline!(
[["INCRBY", redis_k, incr], ["EXPIRE", redis_k, fix_ttl(ttl)]],
key,
opts
)
|> hd()
end

defp maybe_incr_default(adapter_meta, key, redis_k, default)
defp maybe_incr_default(adapter_meta, key, redis_k, default, opts)
when is_integer(default) and default > 0 do
case Command.exec!(adapter_meta, ["EXISTS", redis_k], key) do
case Command.exec!(adapter_meta, ["EXISTS", redis_k], key, opts) do
1 ->
adapter_meta

0 ->
_ = Command.exec!(adapter_meta, ["INCRBY", redis_k, default], key)
_ = Command.exec!(adapter_meta, ["INCRBY", redis_k, default], key, opts)

adapter_meta
end
end

defp maybe_incr_default(adapter_meta, _, _, _), do: adapter_meta
defp maybe_incr_default(adapter_meta, _, _, _, _) do
adapter_meta
end

## Nebulex.Adapter.Queryable

@impl true
defspan execute(adapter_meta, operation, query, _opts) do
do_execute(adapter_meta, operation, query)
defspan execute(adapter_meta, operation, query, opts) do
do_execute(adapter_meta, operation, query, opts)
end

defp do_execute(%{mode: mode} = adapter_meta, :count_all, nil) do
exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)])
defp do_execute(%{mode: mode} = adapter_meta, :count_all, nil, opts) do
exec!(mode, [adapter_meta, ["DBSIZE"], opts], [0, &Kernel.+(&2, &1)])
end

defp do_execute(%{mode: mode} = adapter_meta, :delete_all, nil) do
size = exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)])
_ = exec!(mode, [adapter_meta, ["FLUSHDB"]], [])
defp do_execute(%{mode: mode} = adapter_meta, :delete_all, nil, opts) do
size = exec!(mode, [adapter_meta, ["DBSIZE"], opts], [0, &Kernel.+(&2, &1)])
_ = exec!(mode, [adapter_meta, ["FLUSHDB"], opts], [])

size
end

defp do_execute(%{mode: :standalone} = adapter_meta, :delete_all, {:in, keys})
defp do_execute(%{mode: :standalone} = adapter_meta, :delete_all, {:in, keys}, opts)
when is_list(keys) do
_ = Command.exec!(adapter_meta, ["DEL" | Enum.map(keys, &enc_key(adapter_meta, &1))])
_ = Command.exec!(adapter_meta, ["DEL" | Enum.map(keys, &enc_key(adapter_meta, &1))], opts)

length(keys)
end

defp do_execute(adapter_meta, :delete_all, {:in, keys})
defp do_execute(adapter_meta, :delete_all, {:in, keys}, opts)
when is_list(keys) do
:ok =
keys
Expand All @@ -736,22 +769,23 @@ defmodule NebulexRedisAdapter do
Command.exec!(
adapter_meta,
["DEL" | Enum.map(keys_group, &enc_key(adapter_meta, &1))],
hash_slot
hash_slot,
opts
)
end)

length(keys)
end

defp do_execute(adapter_meta, :all, query) do
execute_query(query, adapter_meta)
defp do_execute(adapter_meta, :all, query, opts) do
execute_query(query, adapter_meta, opts)
end

@impl true
defspan stream(adapter_meta, query, _opts) do
defspan stream(adapter_meta, query, opts) do
Stream.resource(
fn ->
execute_query(query, adapter_meta)
execute_query(query, adapter_meta, opts)
end,
fn
[] -> {:halt, []}
Expand All @@ -773,10 +807,11 @@ defmodule NebulexRedisAdapter do
defp with_pipeline(
%{serializer: serializer, decode_value_opts: dec_val_opts} = adapter_meta,
key,
pipeline
pipeline,
opts
) do
adapter_meta
|> Command.pipeline!(pipeline, key)
|> Command.pipeline!(pipeline, key, opts)
|> hd()
|> serializer.decode_value(dec_val_opts)
end
Expand All @@ -797,17 +832,17 @@ defmodule NebulexRedisAdapter do
"expected ttl: to be an integer >= 1000 or :infinity, got: #{inspect(ttl)}"
end

defp execute_query(nil, %{serializer: serializer} = adapter_meta) do
defp execute_query(nil, %{serializer: serializer} = adapter_meta, opts) do
"*"
|> execute_query(adapter_meta)
|> execute_query(adapter_meta, opts)
|> Enum.map(&serializer.decode_key/1)
end

defp execute_query(pattern, %{mode: mode} = adapter_meta) when is_binary(pattern) do
exec!(mode, [adapter_meta, ["KEYS", pattern]], [[], &Kernel.++(&1, &2)])
defp execute_query(pattern, %{mode: mode} = adapter_meta, opts) when is_binary(pattern) do
exec!(mode, [adapter_meta, ["KEYS", pattern], opts], [[], &Kernel.++(&1, &2)])
end

defp execute_query(pattern, _adapter_meta) do
defp execute_query(pattern, _adapter_meta, _opts) do
raise Nebulex.QueryError, message: "invalid pattern", query: pattern
end

Expand Down
Loading

0 comments on commit c9a61c8

Please sign in to comment.