-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorganization_worker.ex
65 lines (49 loc) · 1.22 KB
/
organization_worker.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
defmodule Sender.OrganizationWorker do
use GenServer
@concurrency 3
# Client
def start_link(org_id) do
state = %{
org_id: org_id,
queue: :queue.new()
}
GenServer.start_link(__MODULE__, state, name: via_tuple(org_id))
end
def enqueue(org_id, f) do
GenServer.cast(via_tuple(org_id), {:enqueue, f})
end
# Server
@impl true
def init(stack) do
schedule_work()
{:ok, stack}
end
@impl true
def handle_cast({:enqueue, f}, %{queue: queue} = state) do
new_queue = :queue.in(f, queue)
new_state = Map.put(state, :queue, new_queue)
{:noreply, new_state}
end
@impl true
def handle_info(:process_queue, %{queue: queue} = state) do
new_queue =
Enum.reduce(1..@concurrency, queue, fn _, new_queue ->
{value, new_queue} = :queue.out(new_queue)
case value do
{:value, f} -> f.()
:empty -> nil
end
new_queue
end)
schedule_work()
new_state = Map.put(state, :queue, new_queue)
{:noreply, new_state}
end
# Private
defp via_tuple(org_id) do
{:via, Registry, {Registry.Organizations, org_id}}
end
defp schedule_work() do
Process.send_after(self(), :process_queue, 5_000)
end
end