Skip to content

Commit

Permalink
Implement KV impl using ETS to avoid GenServer bottlenecks
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Manzanera committed Feb 9, 2021
1 parent 255b562 commit 62933a1
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 123 deletions.
222 changes: 114 additions & 108 deletions lib/uniris/db/key_value_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ defmodule Uniris.DB.KeyValueImpl do

@behaviour DBImpl

@db_name :kv_db
@transaction_db_name :uniris_kv_db_transactions
@chain_db_name :uniris_kv_db_chain
@chain_lookup_db_name :uniris_kv_db_chain_lookup
@beacon_slot_db_name :uniris_kv_db_beacon_slot
@beacon_slots_db_name :uniris_kv_db_beacon_slots
@beacon_summary_db_name :uniris_kv_db_beacon_summary

require Logger

@doc """
Initialize the KV store
Expand All @@ -30,18 +37,18 @@ defmodule Uniris.DB.KeyValueImpl do
@spec get_transaction(binary(), fields :: list()) ::
{:ok, Transaction.t()} | {:error, :transaction_not_exists}
def get_transaction(address, fields \\ []) when is_binary(address) and is_list(fields) do
case CubDB.get(get_db(), {"transaction", address}) do
nil ->
case :ets.lookup(@transaction_db_name, address) do
[] ->
{:error, :transaction_not_exists}

tx ->
tx =
[{_, tx}] ->
filter_tx =
tx
|> Transaction.to_map()
|> Utils.take_in(fields)
|> Transaction.from_map()

{:ok, tx}
{:ok, filter_tx}
end
end

Expand All @@ -51,26 +58,15 @@ defmodule Uniris.DB.KeyValueImpl do
"""
@spec get_transaction_chain(binary(), list()) :: Enumerable.t()
def get_transaction_chain(address, fields \\ []) when is_binary(address) and is_list(fields) do
db = get_db()

Stream.resource(
fn -> CubDB.get(db, {"chain", address}) end,
fn -> :ets.lookup(@chain_db_name, address) end,
fn
nil ->
{:halt, []}
[{_, address} | rest] ->
{:ok, tx} = get_transaction(address, fields)
{[tx], rest}

[] ->
_ ->
{:halt, []}

[address | rest] ->
tx =
db
|> CubDB.get({"transaction", address})
|> Transaction.to_map()
|> Utils.take_in(fields)
|> Transaction.from_map()

{[tx], rest}
end,
fn _ -> :ok end
)
Expand All @@ -82,7 +78,8 @@ defmodule Uniris.DB.KeyValueImpl do
"""
@spec write_transaction(Transaction.t()) :: :ok
def write_transaction(tx = %Transaction{address: address}) do
:ok = CubDB.put(get_db(), {"transaction", address}, tx)
true = :ets.insert(@transaction_db_name, {address, tx})
:ok
end

@impl DBImpl
Expand All @@ -91,20 +88,13 @@ defmodule Uniris.DB.KeyValueImpl do
"""
@spec write_transaction_chain(Enumerable.t()) :: :ok
def write_transaction_chain(chain) do
db = get_db()

%Transaction{address: chain_address} = Enum.at(chain, 0)

chain
|> Stream.each(&CubDB.put(db, {"transaction", &1.address}, &1))
Stream.each(chain, fn tx = %Transaction{address: address} ->
true = :ets.insert(@chain_db_name, {chain_address, address})
:ok = write_transaction(tx)
end)
|> Stream.run()

transaction_addresses =
chain
|> Stream.map(& &1.address)
|> Enum.to_list()

:ok = CubDB.put(db, {"chain", chain_address}, transaction_addresses)
end

@doc """
Expand All @@ -113,7 +103,8 @@ defmodule Uniris.DB.KeyValueImpl do
@impl DBImpl
@spec add_last_transaction_address(binary(), binary()) :: :ok
def add_last_transaction_address(tx_address, last_address) do
:ok = CubDB.put(get_db(), {"chain_lookup", tx_address}, last_address)
true = :ets.insert(@chain_lookup_db_name, {{:last_transaction, tx_address}, last_address})
:ok
end

@doc """
Expand All @@ -122,15 +113,9 @@ defmodule Uniris.DB.KeyValueImpl do
@impl DBImpl
@spec list_last_transaction_addresses() :: Enumerable.t()
def list_last_transaction_addresses do
{:ok, lookup} =
CubDB.select(get_db(),
pipe: [
filter: fn {key, _} -> match?({"chain_lookup", _}, key) end,
map: fn {{"chain_lookup", address}, last_address} -> {address, last_address} end
]
)

lookup
:ets.select(@chain_lookup_db_name, [
{{{:last_transaction, :"$1"}, :"$2"}, [], [{{:"$1", :"$2"}}]}
])
end

@impl DBImpl
Expand All @@ -139,43 +124,30 @@ defmodule Uniris.DB.KeyValueImpl do
"""
@spec list_transactions(list()) :: Enumerable.t()
def list_transactions(fields \\ []) when is_list(fields) do
{:ok, txs} =
CubDB.select(get_db(),
pipe: [
filter: fn {key, _} ->
match?({"transaction", _}, key)
end,
map: fn {_, tx} ->
tx
|> Transaction.to_map()
|> Utils.take_in(fields)
|> Transaction.from_map()
end
]
)

txs
@transaction_db_name
|> ets_table_keys()
|> Stream.map(fn address ->
{:ok, tx} = get_transaction(address, fields)
tx
end)
end

@impl DBImpl
@spec get_beacon_slots(binary(), DateTime.t()) :: Enumerable.t()
def get_beacon_slots(subset, from_date = %DateTime{}) when is_binary(subset) do
Stream.resource(
fn -> CubDB.get(get_db(), {:beacon_slots, subset}) end,
fn -> :ets.lookup(@beacon_slots_db_name, subset) end,
fn
nil ->
{:halt, []}

[] ->
{:halt, []}

[time | rest] ->
if DateTime.compare(from_date, time) == :gt do
slot = CubDB.get(get_db(), {:beacon_slot, subset, time})
[{_, slot_time} | rest] ->
if DateTime.compare(from_date, slot_time) == :gt do
{:ok, slot} = get_beacon_slot(subset, slot_time)
{[slot], rest}
else
{[], rest}
end

_ ->
{:halt, []}
end,
fn _ -> :ok end
)
Expand All @@ -184,65 +156,45 @@ defmodule Uniris.DB.KeyValueImpl do
@impl DBImpl
@spec get_beacon_slot(binary(), DateTime.t()) :: {:ok, Slot.t()} | {:error, :not_found}
def get_beacon_slot(subset, date = %DateTime{}) when is_binary(subset) do
case CubDB.get(get_db(), {:beacon_slot, subset, date}) do
nil ->
case :ets.lookup(@beacon_slot_db_name, {subset, date}) do
[] ->
{:error, :not_found}

slot ->
[{_, slot}] ->
{:ok, slot}
end
end

@impl DBImpl
@spec get_beacon_summary(binary(), DateTime.t()) :: {:ok, Summary.t()} | {:error, :not_found}
def get_beacon_summary(subset, date = %DateTime{}) when is_binary(subset) do
case CubDB.get(get_db(), {:beacon_summary, subset, date}) do
nil ->
case :ets.lookup(@beacon_summary_db_name, {subset, date}) do
[] ->
{:error, :not_found}

summary ->
[{_, summary}] ->
{:ok, summary}
end
end

@impl DBImpl
def register_beacon_slot(slot = %Slot{subset: subset, slot_time: slot_time}) do
:ok = CubDB.put(get_db(), {:beacon_slot, subset, slot_time}, slot)
:ok = CubDB.update(get_db(), {:beacon_slots, subset}, [slot_time], &[slot_time | &1])
true = :ets.insert(@beacon_slot_db_name, {{subset, slot_time}, slot})
true = :ets.insert(@beacon_slots_db_name, {subset, slot_time})
:ok
end

@impl DBImpl
def register_beacon_summary(summary = %Summary{subset: subset, summary_time: summary_time}) do
:ok = CubDB.put(get_db(), {:beacon_summary, subset, summary_time}, summary)
:ok = CubDB.put(get_db(), {:beacon_slots, subset}, [])

clean_beacon_slots(subset, summary_time)
:ok
end
true = :ets.insert(@beacon_summary_db_name, {{subset, summary_time}, summary})

defp clean_beacon_slots(subset, summary_time) do
keys_to_delete =
case CubDB.get(get_db(), {:beacon_slots, subset}) do
nil ->
[]
:ets.lookup(@beacon_slots_db_name, subset)
|> Enum.filter(&(DateTime.compare(summary_time, elem(&1, 1)) == :gt))
|> Enum.each(&:ets.delete(@beacon_slot_db_name, &1))

times ->
times
|> Enum.filter(&(DateTime.compare(summary_time, &1) == :gt))
|> Enum.map(&{:beacon_slot, subset, &1})
end
:ets.delete(@beacon_slots_db_name, subset)

case keys_to_delete do
[] ->
:ok

_ ->
Task.start(fn ->
Process.sleep(60_000)
CubDB.delete_multi(get_db(), keys_to_delete)
end)
end
:ok
end

@impl DBImpl
Expand All @@ -253,11 +205,65 @@ defmodule Uniris.DB.KeyValueImpl do
@impl GenServer
def init(opts) do
root_dir = Keyword.get(opts, :root_dir, Application.app_dir(:uniris, "priv/storage"))
{:ok, db} = CubDB.start_link(root_dir)
dump_delay = Keyword.get(opts, :dump_delay, 0)

File.mkdir_p!(root_dir)

init_table(root_dir, @transaction_db_name, :set)
init_table(root_dir, @chain_db_name, :bag)
init_table(root_dir, @chain_lookup_db_name, :set)
init_table(root_dir, @beacon_slot_db_name, :set)
init_table(root_dir, @beacon_slots_db_name, :bag)
init_table(root_dir, @beacon_summary_db_name, :set)

Process.send_after(self(), :dump, dump_delay)
{:ok, %{root_dir: root_dir, dump_delay: dump_delay}}
end

@impl GenServer
def handle_info(:dump, state = %{root_dir: root_dir, dump_delay: dump_delay}) do
Enum.each(
[
@transaction_db_name,
@chain_db_name,
@chain_lookup_db_name,
@beacon_slot_db_name,
@beacon_slots_db_name,
@beacon_summary_db_name
],
fn table_name ->
filepath = table_dump_file(root_dir, table_name) |> String.to_charlist()
:ets.tab2file(table_name, filepath)
end
)

Process.send_after(self(), :dump, dump_delay)

:persistent_term.put(@db_name, db)
{:ok, %{db: db}}
{:noreply, state}
end

defp get_db, do: :persistent_term.get(@db_name)
defp init_table(root_dir, table_name, type) do
table_filename = table_dump_file(root_dir, table_name)

unless File.exists?(table_filename) do
:ets.new(table_name, [:named_table, type, :public, read_concurrency: true])
end

:ets.file2tab(String.to_charlist(table_filename))
end

defp table_dump_file(root_dir, table_name) do
Path.join(root_dir, Atom.to_string(table_name))
end

defp ets_table_keys(table_name) do
Stream.resource(
fn -> :ets.first(table_name) end,
fn
:"$end_of_table" -> {:halt, nil}
previous_key -> {[previous_key], :ets.next(table_name, previous_key)}
end,
fn _ -> :ok end
)
end
end
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ defmodule Uniris.MixProject do
{:stream_data, "~> 0.5.0"},
{:elixir_make, "~> 0.6.0", runtime: false},
{:logger_file_backend, "~> 0.0.11"},
{:cubdb, "~> 0.17.0"},
{:earmark, "~> 1.4"},
{:humanize_time, "~> 1.0"},
{:sizeable, "~> 1.0"},
Expand Down
1 change: 0 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
"credo": {:hex, :credo, "1.5.4", "9914180105b438e378e94a844ec3a5088ae5875626fc945b7c1462b41afc3198", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cf51af45eadc0a3f39ba13b56fdac415c91b34f7b7533a13dc13550277141bc4"},
"crontab": {:hex, :crontab, "1.1.10", "dc9bb1f4299138d47bce38341f5dcbee0aa6c205e864fba7bc847f3b5cb48241", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "1347d889d1a0eda997990876b4894359e34bfbbd688acbb0ba28a2795ca40685"},
"cubdb": {:hex, :cubdb, "0.17.0", "a0a1ce830bc74b94a2e78aea1c529750028e822ae741752245d7f9bdd10c116f", [:mix], [], "hexpm", "60f6845ca1961ac484702bc0430b577695c6d6eec53683b452e616b19d960cde"},
"db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"},
"decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"},
"dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"},
Expand Down
Loading

0 comments on commit 62933a1

Please sign in to comment.