diff --git a/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapter.java b/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapter.java index 94c39df252..f14170731e 100644 --- a/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapter.java +++ b/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapter.java @@ -35,7 +35,6 @@ import com.hivemq.edge.adapters.modbus.config.ModbusAdu; import com.hivemq.edge.adapters.modbus.config.ModbusDataType; import com.hivemq.edge.adapters.modbus.config.ModbusSpecificAdapterConfig; -import com.hivemq.edge.adapters.modbus.config.ModbusToMqttMapping; import com.hivemq.edge.adapters.modbus.config.tag.ModbusTag; import com.hivemq.edge.adapters.modbus.config.tag.ModbusTagDefinition; import com.hivemq.edge.adapters.modbus.impl.ModbusClient; @@ -45,10 +44,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.CONNECTED; import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.DISCONNECTED; @@ -60,7 +59,7 @@ public class ModbusProtocolAdapter implements PollingProtocolAdapter { private final @NotNull ProtocolAdapterState protocolAdapterState; private final @NotNull ModbusClient modbusClient; - private final @NotNull Map> lastSamples = new HashMap<>(); + private final @NotNull Map> lastSamples = new ConcurrentHashMap<>(); private final @NotNull List tags; private final @NotNull String adapterId; @@ -91,6 +90,7 @@ public void start( @Override public void stop(final @NotNull ProtocolAdapterStopInput input, final @NotNull ProtocolAdapterStopOutput output) { + lastSamples.clear(); modbusClient.disconnect().whenComplete((unused, t) -> { if (t == null) { output.stoppedSuccessfully(); @@ -114,7 +114,7 @@ public void poll( } private void pollModbus( - @NotNull PollingInput pollingInput, @NotNull PollingOutput pollingOutput, @NotNull ModbusTag modbusTag) { + final @NotNull PollingInput pollingInput, final @NotNull PollingOutput pollingOutput, final @NotNull ModbusTag modbusTag) { readRegisters(pollingInput.getPollingContext(), modbusClient, modbusTag).whenComplete((modbusdata, throwable) -> { @@ -183,9 +183,9 @@ protected void captureDataSample(final @NotNull ModBusData modBusData, final @No } private void calculateDelta(final @NotNull ModBusData modBusData, final @NotNull PollingOutput pollingOutput) { - final ModbusToMqttMapping subscription = (ModbusToMqttMapping) modBusData.getPollingContext(); + final PollingContext pollingContext = modBusData.getPollingContext(); - final List previousSampleDataPoints = lastSamples.put(subscription, modBusData.getDataPoints()); + final List previousSampleDataPoints = lastSamples.put(pollingContext, modBusData.getDataPoints()); final List currentSamplePoints = modBusData.getDataPoints(); final List delta = AdapterDataUtils.mergeChangedSamples(previousSampleDataPoints, currentSamplePoints); diff --git a/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/util/AdapterDataUtils.java b/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/util/AdapterDataUtils.java index 3f786e9055..15bc5bcaff 100644 --- a/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/util/AdapterDataUtils.java +++ b/modules/hivemq-edge-module-modbus/src/main/java/com/hivemq/edge/adapters/modbus/util/AdapterDataUtils.java @@ -19,29 +19,25 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; public class AdapterDataUtils { - public static boolean matches(final @NotNull DataPoint point, final @NotNull List list) { - return list.stream().filter(dp -> dp.getTagName().equals(point.getTagName())) // First filter by tagName - .anyMatch(dp -> dp.getTagValue().equals(point.getTagValue())); // Then check for tagValue - } public static @NotNull List mergeChangedSamples( final @Nullable List historicalSamples, final @NotNull List currentSamples) { if (historicalSamples == null) { return currentSamples; } - List delta = new ArrayList<>(); - for (int i = 0; i < currentSamples.size(); i++) { - DataPoint currentSample = currentSamples.get(i); - // If the current sample does not match any in the historical samples, it has changed - if (!matches(currentSample, historicalSamples)) { - historicalSamples.set(i, currentSample); - delta.add(currentSample); - } - } - return delta; + + final Map historicalSamplesMap = historicalSamples.stream() + .collect(Collectors.toMap(DataPoint::getTagName, Function.identity())); + + return currentSamples.stream() + .filter(sample -> + !(historicalSamplesMap.containsKey(sample.getTagName()) && historicalSamplesMap.get(sample.getTagName()).getTagValue().equals(sample.getTagValue()))) + .collect(Collectors.toList()); } } diff --git a/modules/hivemq-edge-module-modbus/src/test/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapterTest.java b/modules/hivemq-edge-module-modbus/src/test/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapterTest.java index 21adcb573e..790b783da8 100644 --- a/modules/hivemq-edge-module-modbus/src/test/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-modbus/src/test/java/com/hivemq/edge/adapters/modbus/ModbusProtocolAdapterTest.java @@ -21,11 +21,13 @@ import com.hivemq.edge.adapters.modbus.model.ModBusData; import com.hivemq.edge.adapters.modbus.util.AdapterDataUtils; import com.hivemq.edge.modules.adapters.data.DataPointImpl; +import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; import java.util.List; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; class ModbusProtocolAdapterTest { @@ -51,11 +53,13 @@ void test_mergedSamples() { final ModBusData data2 = createSampleData(); data2.getDataPoints().set(5, new DataPointImpl("register-5", 777)); - AdapterDataUtils.mergeChangedSamples(data1.getDataPoints(), data2.getDataPoints()); + final List dataPoints = + AdapterDataUtils.mergeChangedSamples(data1.getDataPoints(), data2.getDataPoints()); - assertEquals(777, - ((DataPoint) data1.getDataPoints().get(5)).getTagValue(), - "Merged data should contain new value"); + assertThat(dataPoints) + .hasSize(1) + .extracting(DataPoint::getTagValue, DataPoint::getTagName) + .containsExactly(Tuple.tuple(777, "register-5")); } protected static ModBusData createSampleData() {