From 09d958e0ede69c61d726642bdc72a9655b98b5ca Mon Sep 17 00:00:00 2001 From: wireknight Date: Wed, 29 Nov 2023 00:11:28 +0100 Subject: [PATCH] Work in progress on object reuse --- .../core/reactorsystem/ReActorSystem.java | 9 ++- .../chroniclequeue/CQDeserializer.java | 1 + .../chroniclequeue/CQLocalDriver.java | 58 +++++++++---------- .../chroniclequeue/CQRemoteDriver.java | 17 +++--- .../channels/chroniclequeue/CQSerializer.java | 3 - 5 files changed, 42 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/io/reacted/core/reactorsystem/ReActorSystem.java b/core/src/main/java/io/reacted/core/reactorsystem/ReActorSystem.java index a1ac6b7d..b73fbbe5 100644 --- a/core/src/main/java/io/reacted/core/reactorsystem/ReActorSystem.java +++ b/core/src/main/java/io/reacted/core/reactorsystem/ReActorSystem.java @@ -248,10 +248,15 @@ public boolean isSystemDeadLetters(ReActorRef anyRef) { * @param args Arguments for Sl4j */ public void logError(String errorDescription, Serializable ...args) { - if (getSystemLogger().publish(getSystemSink(), new ReActedError(errorDescription, args)) - .isNotSent()) { + try { + if (getSystemLogger().publish(getSystemSink(), new ReActedError(errorDescription, args)) + .isNotSent()) { LOGGER.error("Unable to log error: {}", errorDescription); LOGGER.error(errorDescription, (Object) args); + } + } catch (Exception anyException) { + LOGGER.error("CRITIC! Nested exception while logging an error! ", anyException); + LOGGER.error("Original error:", (Object[]) args); } } diff --git a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQDeserializer.java b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQDeserializer.java index b0daa644..cd87e92b 100644 --- a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQDeserializer.java +++ b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQDeserializer.java @@ -13,6 +13,7 @@ import net.openhft.chronicle.wire.WireIn; import java.io.Serializable; +import java.util.Objects; @NonNullByDefault public class CQDeserializer implements Deserializer { diff --git a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQLocalDriver.java b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQLocalDriver.java index a3d30d3a..e4e4e484 100644 --- a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQLocalDriver.java +++ b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQLocalDriver.java @@ -25,6 +25,7 @@ import io.reacted.patterns.Try; import io.reacted.patterns.UnChecked; import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.RollCycles; import net.openhft.chronicle.threads.Pauser; @@ -35,6 +36,7 @@ import javax.annotation.Nullable; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -42,9 +44,10 @@ @NonNullByDefault public class CQLocalDriver extends LocalDriver { private static final Logger LOGGER = LoggerFactory.getLogger(CQLocalDriver.class); - private final ThreadLocal serializerThreadLocal = ThreadLocal.withInitial(() -> null); + private final ThreadLocal localSerializer = ThreadLocal.withInitial(CQSerializer::new); @Nullable private ChronicleQueue chronicle; + private final ThreadLocal localAppender = ThreadLocal.withInitial(() -> chronicle.createAppender()); @Nullable private ExcerptTailer cqTailer; @@ -99,16 +102,14 @@ CompletionStage sendAsyncMessage(ReActorRef source, ReActorConte sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination, long seqNum, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) { try { - Serializer serializer = serializerThreadLocal.get(); - if (serializer == null) { - serializer = new CQSerializer(Objects.requireNonNull(chronicle.acquireAppender() - .wire())); - serializerThreadLocal.set(serializer); + CQSerializer serializer = localSerializer.get(); + try(var ctx = localAppender.get().acquireWritingDocument(false)) { + serializer.setSerializerOutput(Objects.requireNonNull(ctx.wire())); + writeMessage(serializer, source, destination, seqNum, reActorSystemId, ackingPolicy, message); } - writeMessage(serializer, source, destination, seqNum, reActorSystemId, - ackingPolicy, message); return DeliveryStatus.SENT; } catch (Exception anyException) { + LOGGER.error("ERROR SENDING MESSAGE:", anyException); getLocalReActorSystem().logError("Unable to send message {}", message, anyException); return DeliveryStatus.NOT_SENT; } @@ -116,31 +117,26 @@ CompletionStage sendAsyncMessage(ReActorRef source, ReActorConte @Override public CompletionStage> cleanDriverLoop() { + Optional.ofNullable(localAppender.get()).ifPresent(ExcerptAppender::close); return CompletableFuture.completedFuture(Try.ofRunnable(() -> Objects.requireNonNull(chronicle).close())); } private void chronicleMainLoop(ExcerptTailer tailer) { var waitForNextMsg = Pauser.balanced(); - try(DocumentContext documentContext = tailer.readingDocument()) { - var deserializer = new CQDeserializer(); - ReadMarshallable reader = wireIn -> { - deserializer.setDeserializerInput(wireIn); - readMessage(deserializer); - }; - while (!Thread.currentThread().isInterrupted()) { - try { - tailer.readDocument(reader); - if (documentContext.isPresent()) { - readMessage(deserializer); - waitForNextMsg.reset(); - } else { - waitForNextMsg.pause(); - } - } - catch (Exception anyException) { - LOGGER.error("Unable to decode data", anyException); + var deserializer = new CQDeserializer(); + while (!Thread.currentThread().isInterrupted()) { + try(var ctx = tailer.readingDocument(false)) { + if (!ctx.isPresent()) { + waitForNextMsg.pause(); + } else { + deserializer.setDeserializerInput(Objects.requireNonNull(ctx.wire())); + readMessage(deserializer); + waitForNextMsg.reset(); } } + catch (Exception anyException) { + LOGGER.error("Unable to decode data", anyException); + } } } @@ -164,12 +160,10 @@ void writeMessage(Serializer out, ReActorRef source, ReActorRef destination, lon } public static ReActorRef readReActorRef(Deserializer in) { - ReActorId reActorId = new ReActorId(); - reActorId.decode(in); - ReActorSystemRef reActorSystemRef = new ReActorSystemRef(); - reActorSystemRef.decode(in); - return new ReActorRef(reActorId, reActorSystemRef) - .setHashCode(Objects.hash(reActorId, reActorSystemRef)); + var ref = new ReActorRef(); + ref.decode(in); + ref.setHashCode(Objects.hash(ref.getReActorId(), ref.getReActorSystemRef())); + return ref; } public static ReActorSystemId readReActorSystemId(Deserializer in) { ReActorSystemId reActorSystemId = new ReActorSystemId(); diff --git a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQRemoteDriver.java b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQRemoteDriver.java index 7db69c00..c2716c45 100644 --- a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQRemoteDriver.java +++ b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQRemoteDriver.java @@ -23,6 +23,7 @@ import io.reacted.patterns.Try; import io.reacted.patterns.UnChecked; import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.threads.Pauser; import net.openhft.chronicle.wire.DocumentContext; @@ -37,10 +38,11 @@ @NonNullByDefault public class CQRemoteDriver extends RemotingDriver { - private final ThreadLocal serializerThreadLocal = ThreadLocal.withInitial(() -> null); - + private final ThreadLocal localSerializer = ThreadLocal.withInitial(CQSerializer::new); @Nullable private ChronicleQueue chronicle; + + private final ThreadLocal localAppender = ThreadLocal.withInitial(() -> chronicle.createAppender()); @Nullable private ExcerptTailer cqTailer; @@ -91,14 +93,11 @@ public ChannelId getChannelId() { DeliveryStatus sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination, long seqNum, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) { - Serializer serializer = serializerThreadLocal.get(); - if (serializer == null) { - serializer = new CQSerializer(Objects.requireNonNull(chronicle.acquireAppender() - .wire())); - serializerThreadLocal.set(serializer); + CQSerializer serializer = localSerializer.get(); + try(var ctx = localAppender.get().acquireWritingDocument(false)) { + serializer.setSerializerOutput(Objects.requireNonNull(ctx.wire())); + return sendMessage(getLocalReActorSystem(), serializer, source, destination, seqNum, ackingPolicy, message); } - return sendMessage(getLocalReActorSystem(), serializer, - source, destination, seqNum, ackingPolicy, message); } private void cqRemoteDriverMainLoop(ExcerptTailer cqTailer, ChronicleQueue chronicle) { diff --git a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQSerializer.java b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQSerializer.java index a52d37e3..d80f111a 100644 --- a/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQSerializer.java +++ b/drivers/src/main/java/io/reacted/drivers/channels/chroniclequeue/CQSerializer.java @@ -18,9 +18,6 @@ public class CQSerializer implements Serializer { private WireOut out; - - CQSerializer(WireOut out) {setSerializerOutput(out);} - CQSerializer setSerializerOutput(WireOut out) { this.out = out; return this;