From d637a665e8353411d15add7aab04de1237f076a6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 15 Sep 2022 23:42:47 +0200 Subject: [PATCH] fix(inputs.modbus): Do not fail if a single slave reports errors (#11785) --- plugins/inputs/modbus/modbus.go | 74 ++++++++++------- plugins/inputs/modbus/modbus_test.go | 118 +++++++++++++++++++++++++-- 2 files changed, 154 insertions(+), 38 deletions(-) diff --git a/plugins/inputs/modbus/modbus.go b/plugins/inputs/modbus/modbus.go index 840763be785e6..26e42e88205eb 100644 --- a/plugins/inputs/modbus/modbus.go +++ b/plugins/inputs/modbus/modbus.go @@ -146,26 +146,24 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error { } } - timestamp := time.Now() - for retry := 0; retry <= m.Retries; retry++ { - timestamp = time.Now() - if err := m.gatherFields(); err != nil { - if mberr, ok := err.(*mb.Error); ok && mberr.ExceptionCode == mb.ExceptionCodeServerDeviceBusy && retry < m.Retries { - m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry) - time.Sleep(time.Duration(m.RetriesWaitTime)) - continue - } - // Show the disconnect error this way to not shadow the initial error - if discerr := m.disconnect(); discerr != nil { - m.Log.Errorf("Disconnecting failed: %v", discerr) + for slaveID, requests := range m.requests { + m.Log.Debugf("Reading slave %d for %s...", slaveID, m.Controller) + if err := m.readSlaveData(slaveID, requests); err != nil { + acc.AddError(fmt.Errorf("slave %d: %w", slaveID, err)) + mberr, ok := err.(*mb.Error) + if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy { + m.Log.Debugf("Reconnecting to %s...", m.Controller) + if err := m.disconnect(); err != nil { + return fmt.Errorf("disconnecting failed: %w", err) + } + if err := m.connect(); err != nil { + return fmt.Errorf("connecting failed: %w", err) + } } - return err + continue } - // Reading was successful, leave the retry loop - break - } + timestamp := time.Now() - for slaveID, requests := range m.requests { tags := map[string]string{ "name": m.Name, "type": cCoils, @@ -276,24 +274,40 @@ func (m *Modbus) disconnect() error { return err } -func (m *Modbus) gatherFields() error { - for slaveID, requests := range m.requests { - m.handler.SetSlave(slaveID) - if err := m.gatherRequestsCoil(requests.coil); err != nil { - return err - } - if err := m.gatherRequestsDiscrete(requests.discrete); err != nil { - return err - } - if err := m.gatherRequestsHolding(requests.holding); err != nil { - return err +func (m *Modbus) readSlaveData(slaveID byte, requests requestSet) error { + m.handler.SetSlave(slaveID) + + for retry := 0; retry < m.Retries; retry++ { + err := m.gatherFields(requests) + if err == nil { + // Reading was successful + return nil } - if err := m.gatherRequestsInput(requests.input); err != nil { + + // Exit in case a non-recoverable error occurred + mberr, ok := err.(*mb.Error) + if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy { return err } + + // Wait some time and try again reading the slave. + m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry) + time.Sleep(time.Duration(m.RetriesWaitTime)) } + return m.gatherFields(requests) +} - return nil +func (m *Modbus) gatherFields(requests requestSet) error { + if err := m.gatherRequestsCoil(requests.coil); err != nil { + return err + } + if err := m.gatherRequestsDiscrete(requests.discrete); err != nil { + return err + } + if err := m.gatherRequestsHolding(requests.holding); err != nil { + return err + } + return m.gatherRequestsInput(requests.input) } func (m *Modbus) gatherRequestsCoil(requests []request) error { diff --git a/plugins/inputs/modbus/modbus_test.go b/plugins/inputs/modbus/modbus_test.go index e6f42cae04df2..96fe324d7595b 100644 --- a/plugins/inputs/modbus/modbus_test.go +++ b/plugins/inputs/modbus/modbus_test.go @@ -1050,9 +1050,9 @@ func TestRetryFailExhausted(t *testing.T) { require.NoError(t, modbus.Init()) require.NotEmpty(t, modbus.requests) - err := modbus.Gather(&acc) - require.Error(t, err) - require.Equal(t, "modbus: exception '6' (server device busy), function '129'", err.Error()) + require.NoError(t, modbus.Gather(&acc)) + require.Len(t, acc.Errors, 1) + require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '6' (server device busy), function '129'") } func TestRetryFailIllegal(t *testing.T) { @@ -1072,7 +1072,8 @@ func TestRetryFailIllegal(t *testing.T) { data[1] = byte(0) return data, &mbserver.IllegalFunction - }) + }, + ) modbus := Modbus{ Name: "TestRetryFailExhausted", @@ -1092,9 +1093,9 @@ func TestRetryFailIllegal(t *testing.T) { require.NoError(t, modbus.Init()) require.NotEmpty(t, modbus.requests) - err := modbus.Gather(&acc) - require.Error(t, err) - require.Equal(t, "modbus: exception '1' (illegal function), function '129'", err.Error()) + require.NoError(t, modbus.Gather(&acc)) + require.Len(t, acc.Errors, 1) + require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '1' (illegal function), function '129'") require.Equal(t, counter, 1) } @@ -1866,7 +1867,8 @@ func TestRequestsStartingWithOmits(t *testing.T) { Log: testutil.Logger{}, } modbus.Requests = []requestDefinition{ - {SlaveID: 1, + { + SlaveID: 1, ByteOrder: "ABCD", RegisterType: "holding", Fields: []requestFieldDefinition{ @@ -1942,3 +1944,103 @@ func TestRequestsEmptyFields(t *testing.T) { err := modbus.Init() require.EqualError(t, err, `configuraton invalid: found request section without fields`) } + +func TestMultipleSlavesOneFail(t *testing.T) { + telegraf.Debug = true + modbus := Modbus{ + Name: "Test", + Controller: "tcp://localhost:1502", + Retries: 1, + ConfigurationType: "request", + Log: testutil.Logger{}, + } + modbus.Requests = []requestDefinition{ + { + SlaveID: 1, + ByteOrder: "ABCD", + RegisterType: "holding", + Fields: []requestFieldDefinition{ + { + Name: "holding-0", + Address: uint16(0), + InputType: "INT16", + }, + }, + }, + { + SlaveID: 2, + ByteOrder: "ABCD", + RegisterType: "holding", + Fields: []requestFieldDefinition{ + { + Name: "holding-0", + Address: uint16(0), + InputType: "INT16", + }, + }, + }, + { + SlaveID: 3, + ByteOrder: "ABCD", + RegisterType: "holding", + Fields: []requestFieldDefinition{ + { + Name: "holding-0", + Address: uint16(0), + InputType: "INT16", + }, + }, + }, + } + require.NoError(t, modbus.Init()) + + serv := mbserver.NewServer() + require.NoError(t, serv.ListenTCP("localhost:1502")) + defer serv.Close() + + serv.RegisterFunctionHandler(3, + func(s *mbserver.Server, frame mbserver.Framer) ([]byte, *mbserver.Exception) { + tcpframe, ok := frame.(*mbserver.TCPFrame) + if !ok { + return nil, &mbserver.IllegalFunction + } + + if tcpframe.Device == 2 { + // Simulate device 2 being unavailable + return []byte{}, &mbserver.GatewayTargetDeviceFailedtoRespond + } + return []byte{0x02, 0x00, 0x42}, &mbserver.Success + }, + ) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "modbus", + map[string]string{ + "type": cHoldingRegisters, + "slave_id": "1", + "name": modbus.Name, + }, + map[string]interface{}{"holding-0": int16(0x42)}, + time.Unix(0, 0), + ), + testutil.MustMetric( + "modbus", + map[string]string{ + "type": cHoldingRegisters, + "slave_id": "3", + "name": modbus.Name, + }, + map[string]interface{}{"holding-0": int16(0x42)}, + time.Unix(0, 0), + ), + } + + var acc testutil.Accumulator + require.NoError(t, modbus.Gather(&acc)) + acc.Wait(len(expected)) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) + require.Len(t, acc.Errors, 1) + require.EqualError(t, acc.FirstError(), "slave 2: modbus: exception '11' (gateway target device failed to respond), function '131'") +}