-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchatter.ex
257 lines (214 loc) · 6.84 KB
/
chatter.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
defmodule Chatter do
@moduledoc """
`Chatter` allows broadcasting information between a set of nodes. Nodes are identified by
the `Chatter.NetID` record, which contains an IPv4 address and a port.
TODO
"""
use Application
require Logger
require Chatter.NetID
require Chatter.BroadcastID
require Chatter.Gossip
alias Chatter.NetID
alias Chatter.BroadcastID
alias Chatter.MulticastHandler
alias Chatter.OutgoingSupervisor
alias Chatter.PeerDB
alias Chatter.SerializerDB
alias Chatter.Gossip
alias Chatter.Planner
def start(_type, args)
do
:random.seed(:os.timestamp)
Chatter.Supervisor.start_link(args)
end
@doc """
TODO
"""
@spec broadcast(Gossip.t) :: :ok
def broadcast(gossip)
when Gossip.is_valid(gossip)
do
broadcast(Gossip.distribution_list(gossip), Gossip.payload(gossip))
end
@doc """
TODO
"""
@spec broadcast(list(NetID.t), tuple) :: :ok
def broadcast(distribution_list, tup)
when is_list(distribution_list) and
is_tuple(tup) and
tuple_size(tup) > 1
do
:ok = NetID.validate_list(distribution_list)
# verify that the payload serializer has already registered
{:ok, _handler} = SerializerDB.get_(tup)
own_id = Chatter.local_netid
db_pid = PeerDB.locate!
{:ok, seqno} = PeerDB.inc_broadcast_seqno(db_pid)
{:ok, seen_ids} = PeerDB.get_senders(db_pid)
peers_seen_us = case PeerDB.get_seen_id_list_(own_id) do
{:ok, peer_list} ->
peer_list |> Enum.reduce([], fn(x,acc) -> [BroadcastID.origin(x)|acc] end)
{:error, :not_found} -> []
end
other_peers = peers
|> Planner.group_mcast_peers
|> Enum.take_random(4)
|> Enum.map(fn(x) -> hd(x) end)
gossip = Gossip.new(own_id, seqno, tup)
|> Gossip.distribution_list(distribution_list)
|> Gossip.seen_ids(seen_ids)
|> Gossip.other_ids(other_peers)
|> Gossip.remove_from_distribution_list(peers_seen_us)
## Logger.debug "multicasting [#{inspect gossip}]"
# multicast first
:ok = MulticastHandler.send(MulticastHandler.locate!, gossip)
# add 1 random element to the distribution list from the original
# distribution list
gossip =
Gossip.add_to_distribution_list(gossip,
Enum.take_random(distribution_list, 1))
# outgoing handler uses its already open channels and returns the gossip
# what couldn't be delivered
:ok = OutgoingSupervisor.broadcast(gossip, Chatter.group_manager_key)
# make sure all IDs are stored in the PeerDB
PeerDB.add(db_pid, distribution_list)
end
@doc """
Return the list of peers `Chatter` has ever seen. The list omits the local
`NetID` even though PeerDB has an entry for it.
```
iex(1)> Chatter.peers
[{:net_id, {192, 168, 1, 100}, 29999}]
```
"""
def peers()
do
my_id = local_netid
PeerDB.get_peers_() |> Enum.filter(fn(x) -> x != my_id end)
end
@doc """
Returns the local IPv4 address in the form of a tuple.
```
iex(1)> Chatter.get_local_ip
{192, 168, 1, 100}
```
"""
def get_local_ip
do
{:ok, list} = :inet.getif
[{ip, _broadcast, _netmask}] = list
|> Enum.filter( fn({_ip, bcast, _nm}) -> bcast != :undefined end)
|> Enum.take(1)
ip
end
@doc """
Returns the local node's `NetID`. This function uses the following configuration values:
- :chatter / :my_addr
- :chatter / :my_port
If none of these are available, the local IPv4 address will be determined by
the `Chatter.get_local_ip` function and the port will be defaulted to `29999`.
```
iex(1)> Chatter.local_netid
{:net_id, {192, 168, 1, 100}, 29998}
```
"""
def local_netid
do
# try to figure our local IP if not given
case Application.fetch_env(:chatter, :my_addr) do
{:ok, nil} ->
my_addr = get_local_ip()
{:ok, my_addr_str} ->
{:ok, my_addr} = my_addr_str |> String.to_char_list |> :inet_parse.address
_ ->
my_addr = get_local_ip()
end
my_port = case Application.fetch_env(:chatter, :my_port)
do
{:ok, val} ->
{my_port, ""} = val |> Integer.parse
my_port
:error ->
Logger.info "no my_port config value found for group_manager Application [default: 29999]"
29999
end
NetID.new(my_addr, my_port)
end
@doc """
Returns the local node's UDP multicast `NetID`. This function uses the following configuration values:
- :chatter / :multicast_addr
- :chatter / :multicast_port
If none of these are available, the UDP multicast address will be `224.1.1.1` by default
and the port will be defaulted to `29999`.
```
iex(1)> Chatter.multicast_netid
{:net_id, {224, 1, 1, 1}, 29999}
```
"""
def multicast_netid
do
mcast_addr_str = case Application.fetch_env(:chatter, :multicast_addr)
do
{:ok, val} ->
val
:error ->
Logger.info "no multicast_addr config value found for group_manager Application [default: 224.1.1.1]"
"224.1.1.1"
end
mcast_port_str = case Application.fetch_env(:chatter, :multicast_port)
do
{:ok, val} ->
val
:error ->
Logger.info "no multicast_port config value found for group_manager Application [default: 29999]"
"29999"
end
{:ok, multicast_addr} = mcast_addr_str |> String.to_char_list |> :inet_parse.address
{multicast_port, ""} = mcast_port_str |> Integer.parse
NetID.new(multicast_addr, multicast_port)
end
@doc """
Returns the local node's UDP multicast TTL value. This function uses the following configuration value:
- :chatter / :multicast_ttl
If no confifuration value is available, the default is `4`.
```
iex(1)> Chatter.multicast_ttl
4
```
"""
def multicast_ttl
do
case Application.fetch_env(:chatter, :multicast_ttl)
do
{:ok, mcast_ttl_str} ->
{multicast_ttl, ""} = mcast_ttl_str |> Integer.parse
multicast_ttl
:error ->
Logger.info "no multicast_ttl config value found for group_manager Application [default: 4]"
4
end
end
@doc """
Returns the local node's encryption key. This function uses the following configuration value:
- :chatter / :key
The encryption key needs to be 32 characters long. The longer key will be chopped, the shorter key
will be concatenated with `01234567890123456789012345678901` and then chopped to 32 characters.
"""
def group_manager_key
do
case Application.fetch_env(:chatter, :key)
do
{:ok, key} when is_binary(key) and byte_size(key) == 32->
key
:error ->
Logger.error "no 'key' config value found for group_manager Application"
"01234567890123456789012345678901"
{:ok, key} ->
Logger.error "'key' has to be 32 bytes long for group_manager Application"
<< retval :: binary-size(32), _rest :: binary >> = key <> "01234567890123456789012345678901"
retval
end
end
end