From 53de0246dd1b9ba0c2d709e18e168780b4f716ba Mon Sep 17 00:00:00 2001 From: Andreas Schultz Date: Mon, 15 Apr 2024 10:34:34 +0200 Subject: [PATCH] WIP: experimental socket.erl code and hacks --- src/eradius_client.erl | 52 +++++++------ src/eradius_client_socket.erl | 134 ++++++++++++++++++++++++---------- src/eradius_counter.erl | 31 ++++---- src/eradius_lib.erl | 4 + 4 files changed, 145 insertions(+), 76 deletions(-) diff --git a/src/eradius_client.erl b/src/eradius_client.erl index dfccd133..056ed488 100644 --- a/src/eradius_client.erl +++ b/src/eradius_client.erl @@ -162,28 +162,30 @@ restore_upstream_server({ServerIP, Port, Retries, InitialRetries}) -> proceed_response(Request, {ok, Response, Secret, Authenticator}, _Peer = {_ServerName, {ServerIP, Port}}, TS1, MetricsInfo, Options) -> update_client_request(Request#radius_request.cmd, MetricsInfo, erlang:monotonic_time() - TS1, Request), update_client_responses(MetricsInfo), - case eradius_lib:decode_request(Response, Secret, Authenticator) of - {bad_pdu, "Message-Authenticator Attribute is invalid" = Reason} -> - update_client_response(bad_authenticator, MetricsInfo, Request), - ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), - noreply; - {bad_pdu, "Authenticator Attribute is invalid" = Reason} -> - update_client_response(bad_authenticator, MetricsInfo, Request), - ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), - noreply; - {bad_pdu, "unknown request type" = Reason} -> - update_client_response(unknown_req_type, MetricsInfo, Request), - ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), - noreply; - {bad_pdu, Reason} -> - update_client_response(dropped, MetricsInfo, Request), - ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), - maybe_failover(Request, noreply, {ServerIP, Port}, Options); - Decoded -> - update_server_status_metric(ServerIP, Port, true, Options), - update_client_response(Decoded#radius_request.cmd, MetricsInfo, Request), - {ok, Response, Authenticator} - end; + {ok, Response, Authenticator}; + + %% case eradius_lib:decode_request(Response, Secret, Authenticator) of + %% {bad_pdu, "Message-Authenticator Attribute is invalid" = Reason} -> + %% update_client_response(bad_authenticator, MetricsInfo, Request), + %% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), + %% noreply; + %% {bad_pdu, "Authenticator Attribute is invalid" = Reason} -> + %% update_client_response(bad_authenticator, MetricsInfo, Request), + %% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), + %% noreply; + %% {bad_pdu, "unknown request type" = Reason} -> + %% update_client_response(unknown_req_type, MetricsInfo, Request), + %% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), + %% noreply; + %% {bad_pdu, Reason} -> + %% update_client_response(dropped, MetricsInfo, Request), + %% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]), + %% maybe_failover(Request, noreply, {ServerIP, Port}, Options); + %% Decoded -> + %% update_server_status_metric(ServerIP, Port, true, Options), + %% update_client_response(Decoded#radius_request.cmd, MetricsInfo, Request), + %% {ok, Response, Authenticator} + %% end; proceed_response(Request, Response, {_ServerName, {ServerIP, Port}}, TS1, MetricsInfo, Options) -> update_client_responses(MetricsInfo), @@ -404,7 +406,7 @@ configure(State) -> %% private prepare_pools() -> - ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 1}, {write_concurrency,true}]), + ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 1}, {read_concurrency,true}]), lists:foreach(fun({_PoolName, Servers}) -> prepare_pool(Servers) end, application:get_env(eradius, servers_pool, [])), lists:foreach(fun(Server) -> store_upstream_servers(Server) end, application:get_env(eradius, servers, [])), init_server_status_metrics(). @@ -510,7 +512,9 @@ next_port_and_req_id(Peer, NumberOfPorts, Counters) -> NextReqId = 0 end, NewCounters = Counters#{Peer => {NextPortIdx, NextReqId}}, - {NextPortIdx, NextReqId, NewCounters}. + R = {NextPortIdx, NextReqId, NewCounters}, + %% ?LOG(info, "~s: ~p", [?FUNCTION_NAME, R]), + R. find_socket_process(PortIdx, Sockets, SocketIP, Sup) -> case array:get(PortIdx, Sockets) of diff --git a/src/eradius_client_socket.erl b/src/eradius_client_socket.erl index d03ff2b3..f5b0c12e 100644 --- a/src/eradius_client_socket.erl +++ b/src/eradius_client_socket.erl @@ -5,23 +5,37 @@ -export([start/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client, socket, pending, mode, counter}). +-record(state, {client, socket, pending, mode, active_n, counter}). + +-include_lib("kernel/include/logger.hrl"). start(SocketIP, Client, PortIdx) -> gen_server:start_link(?MODULE, [SocketIP, Client, PortIdx], []). init([SocketIP, Client, PortIdx]) -> Client ! {PortIdx, self()}, - case SocketIP of - undefined -> - ExtraOptions = []; - SocketIP when is_tuple(SocketIP) -> - ExtraOptions = [{ip, SocketIP}] - end, - RecBuf = application:get_env(eradius, recbuf, 8192), - SndBuf = application:get_env(eradius, sndbuf, 131072), - {ok, Socket} = gen_udp:open(0, [{active, once}, binary , {recbuf, RecBuf}, {sndbuf, SndBuf} | ExtraOptions]), - {ok, #state{client = Client, socket = Socket, pending = maps:new(), mode = active, counter = 0}}. + RecBuf = application:get_env(eradius, recbuf, 256*1024), + SndBuf = application:get_env(eradius, sndbuf, 256*1024), + + SockAddr = + case SocketIP of + undefined -> any; + _ when is_tuple(SocketIP) -> SocketIP + end, + {ok, Socket} = socket:open(inet, dgram, udp), + ok = socket:bind(Socket, #{family => inet, port => 0, addr => SockAddr}), + ok = socket:setopt(Socket, socket, rcvbuf, RecBuf), + ok = socket:setopt(Socket, socket, sndbuf, SndBuf), + + self() ! {'$socket', Socket, select, undefined}, + + State = #state{client = Client, + socket = Socket, + pending = maps:new(), + mode = active, + active_n = 100, + counter = 0}, + {ok, State}. handle_call(_Request, _From, State) -> {noreply, State}. @@ -29,9 +43,12 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({'$socket', Socket, select, Info}, #state{socket = Socket} = State) -> + handle_input(Socket, Info, State); + handle_info({SenderPid, send_request, {IP, Port}, ReqId, EncRequest}, State = #state{socket = Socket, pending = Pending, counter = Counter}) -> - case gen_udp:send(Socket, IP, Port, EncRequest) of + case socket:sendto(Socket, EncRequest, #{family => inet, port => Port, addr => IP}) of ok -> ReqKey = {IP, Port, ReqId}, NPending = maps:put(ReqKey, SenderPid, Pending), @@ -41,31 +58,6 @@ handle_info({SenderPid, send_request, {IP, Port}, ReqId, EncRequest}, {noreply, State} end; -handle_info({udp, Socket, FromIP, FromPort, EncRequest}, - State = #state{socket = Socket, pending = Pending, mode = Mode, counter = Counter}) -> - case eradius_lib:decode_request_id(EncRequest) of - {ReqId, EncRequest} -> - case maps:find({FromIP, FromPort, ReqId}, Pending) of - error -> - %% discard reply because we didn't expect it - inet:setopts(Socket, [{active, once}]), - {noreply, State}; - {ok, WaitingSender} -> - WaitingSender ! {self(), response, ReqId, EncRequest}, - inet:setopts(Socket, [{active, once}]), - NPending = maps:remove({FromIP, FromPort, ReqId}, Pending), - NState = State#state{pending = NPending, counter = Counter-1}, - case {Mode, Counter-1} of - {inactive, 0} -> {stop, normal, NState}; - _ -> {noreply, NState} - end - end; - {bad_pdu, _} -> - %% discard reply because it was malformed - inet:setopts(Socket, [{active, once}]), - {noreply, State} - end; - handle_info(close, State = #state{counter = Counter}) -> case Counter of 0 -> {stop, normal, State}; @@ -80,3 +72,71 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +handle_input(Socket, Info, #state{active_n = ActiveN} = State) -> + handle_input(Socket, Info, 0, ActiveN, State). + +handle_input(_Socket, _Info, _Cnt, _Max, #state{mode = inactive, counter = 0} = State) -> + {stop, normal, State}; +handle_input(Socket, _Info, Cnt, Max, State0) + when Cnt >= Max -> + %% break the loop and restart + self() ! {'$socket', Socket, select, undefined}, + {noreply, State0}; + +handle_input(Socket, Info, Cnt, Max, State0) -> + case socket:recvfrom(Socket, 0, [], nowait) of + {error, _} -> + State = handle_err_input(Socket, State0), + handle_input(Socket, Info, Cnt + 1, Max, State); + + {ok, {#{addr := IP, port := Port}, Data}} -> + ArrivalTS = erlang:monotonic_time(), + State = handle_message(ArrivalTS, IP, Port, Data, State0), + handle_input(Socket, Info, Cnt + 1, Max, State); + + {select, _SelectInfo} when Cnt == 0 -> + %% there must be something in the error queue + State = handle_err_input(Socket, State0), + handle_input(Socket, Info, Cnt + 1, Max, State); + + {select, _SelectInfo} -> + {noreply, State0} + end. + +handle_err_input(Socket, State) -> + case socket:recvmsg(Socket, [errqueue], 0) of + {ok, #{addr := #{addr := IP, port := Port}, ctrl := Ctrl}} -> + %% lists:foreach(handle_socket_error(_, IP, Port, State), Ctrl), + ok; + {error, timeout} -> + ok; + {error, ewouldblock} -> + ok; + + Other -> + ?LOG(error, "got unhandled error input: ~p", [Other]) + end, + State. + +handle_message(ArrivalTS, FromIP, FromPort, EncRequest, + #state{pending = Pending, mode = Mode, counter = Counter} = State) -> + case eradius_lib:decode_request_id(EncRequest) of + {ReqId, EncRequest} -> + case maps:find({FromIP, FromPort, ReqId}, Pending) of + error -> + %% discard reply because we didn't expect it + State; + {ok, WaitingSender} -> + WaitingSender ! {self(), response, ReqId, EncRequest}, + NPending = maps:remove({FromIP, FromPort, ReqId}, Pending), + State#state{pending = NPending, counter = Counter - 1} + end; + {bad_pdu, _} -> + %% discard reply because it was malformed + State + end. diff --git a/src/eradius_counter.erl b/src/eradius_counter.erl index 9e922550..0189b2d5 100644 --- a/src/eradius_counter.erl +++ b/src/eradius_counter.erl @@ -86,17 +86,17 @@ aggregate({Servers, {ResetTS, Nass}}) -> %% @doc Set Value for the given prometheus boolean metric by the given Name with %% the given values set_boolean_metric(Name, Labels, Value) -> - case code:is_loaded(prometheus) of - {file, _} -> + %% case code:is_loaded(prometheus) of + %% {file, _} -> try prometheus_boolean:set(Name, Labels, Value) catch _:_ -> prometheus_boolean:declare([{name, server_status}, {labels, [server_ip, server_port]}, {help, "Status of an upstream RADIUS Server"}]), prometheus_boolean:set(Name, Labels, Value) - end; - _ -> - ok + %% end; + %% _ -> + %% ok end. %% @doc Update the given histogram metric value @@ -104,8 +104,8 @@ set_boolean_metric(Name, Labels, Value) -> %% it is much easy to use histograms in this way. As we don't need to manage buckets and do %% the other histogram things in eradius, but prometheus.erl will do it for us observe(Name, {{ClientName, ClientIP, _}, {ServerName, ServerIP, ServerPort}} = MetricsInfo, Value, Help) -> - case code:is_loaded(prometheus) of - {file, _} -> + %% case code:is_loaded(prometheus) of + %% {file, _} -> try prometheus_histogram:observe(Name, [ServerIP, ServerPort, ServerName, ClientName, ClientIP], Value) catch _:_ -> @@ -114,13 +114,14 @@ observe(Name, {{ClientName, ClientIP, _}, {ServerName, ServerIP, ServerPort}} = {duration_unit, milliseconds}, {buckets, Buckets}, {help, Help}]), observe(Name, MetricsInfo, Value, Help) - end; - _ -> - ok + %% end; + %% _ -> + %% ok end. + observe(Name, #nas_prop{server_ip = ServerIP, server_port = ServerPort, nas_ip = NasIP, nas_id = NasId} = Nas, Value, ServerName, Help) -> - case code:is_loaded(prometheus) of - {file, _} -> + %% case code:is_loaded(prometheus) of + %% {file, _} -> try prometheus_histogram:observe(Name, [inet:ntoa(ServerIP), ServerPort, ServerName, inet:ntoa(NasIP), NasId], Value) catch _:_ -> @@ -129,9 +130,9 @@ observe(Name, #nas_prop{server_ip = ServerIP, server_port = ServerPort, nas_ip = {duration_unit, milliseconds}, {buckets, Buckets}, {help, Help}]), observe(Name, Nas, Value, ServerName, Help) - end; - _ -> - ok + %% end; + %% _ -> + %% ok end. %% helper to be called from the aggregator to fetch this nodes values diff --git a/src/eradius_lib.erl b/src/eradius_lib.erl index 71ac2de9..a4c73cbe 100644 --- a/src/eradius_lib.erl +++ b/src/eradius_lib.erl @@ -1,4 +1,6 @@ -module(eradius_lib). +-compile([export_all, nowarn_export_all]). + -export([del_attr/2, get_attr/2, encode_request/1, encode_reply/1, decode_request/2, decode_request/3, decode_request_id/1]). -export([random_authenticator/0, zero_authenticator/0, pad_to/2, set_attr/3, get_attributes/1, set_attributes/2]). -export([timestamp/0, timestamp/1, printable_peer/2, make_addr_info/1]). @@ -133,6 +135,8 @@ encode_eap_message(#radius_request{eap_msg = <<>>}, EncReq) -> EncReq. -spec encode_attributes(#radius_request{}, attribute_list()) -> {binary(), non_neg_integer()}. +encode_attributes(_Req, Attributes) when is_binary(Attributes) -> + {Attributes, byte_size(Attributes)}; encode_attributes(Req, Attributes) -> F = fun ({A = #attribute{}, Val}, {Body, BodySize}) -> EncAttr = encode_attribute(Req, A, Val),