Skip to content

Commit

Permalink
Work in progress on object reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
wireknight committed Nov 28, 2023
1 parent cc7bb02 commit 09d958e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,15 @@ public boolean isSystemDeadLetters(ReActorRef anyRef) {
* @param args Arguments for Sl4j
*/
public void logError(String errorDescription, Serializable ...args) {
if (getSystemLogger().publish(getSystemSink(), new ReActedError(errorDescription, args))
.isNotSent()) {
try {
if (getSystemLogger().publish(getSystemSink(), new ReActedError(errorDescription, args))
.isNotSent()) {
LOGGER.error("Unable to log error: {}", errorDescription);
LOGGER.error(errorDescription, (Object) args);
}
} catch (Exception anyException) {
LOGGER.error("CRITIC! Nested exception while logging an error! ", anyException);
LOGGER.error("Original error:", (Object[]) args);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import net.openhft.chronicle.wire.WireIn;

import java.io.Serializable;
import java.util.Objects;

@NonNullByDefault
public class CQDeserializer implements Deserializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.threads.Pauser;
Expand All @@ -35,16 +36,18 @@

import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@NonNullByDefault
public class CQLocalDriver extends LocalDriver<CQLocalDriverConfig> {
private static final Logger LOGGER = LoggerFactory.getLogger(CQLocalDriver.class);
private final ThreadLocal<Serializer> serializerThreadLocal = ThreadLocal.withInitial(() -> null);
private final ThreadLocal<CQSerializer> localSerializer = ThreadLocal.withInitial(CQSerializer::new);
@Nullable
private ChronicleQueue chronicle;
private final ThreadLocal<ExcerptAppender> localAppender = ThreadLocal.withInitial(() -> chronicle.createAppender());
@Nullable
private ExcerptTailer cqTailer;

Expand Down Expand Up @@ -99,48 +102,41 @@ CompletionStage<DeliveryStatus> sendAsyncMessage(ReActorRef source, ReActorConte
sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination, long seqNum,
ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) {
try {
Serializer serializer = serializerThreadLocal.get();
if (serializer == null) {
serializer = new CQSerializer(Objects.requireNonNull(chronicle.acquireAppender()
.wire()));
serializerThreadLocal.set(serializer);
CQSerializer serializer = localSerializer.get();
try(var ctx = localAppender.get().acquireWritingDocument(false)) {
serializer.setSerializerOutput(Objects.requireNonNull(ctx.wire()));
writeMessage(serializer, source, destination, seqNum, reActorSystemId, ackingPolicy, message);
}
writeMessage(serializer, source, destination, seqNum, reActorSystemId,
ackingPolicy, message);
return DeliveryStatus.SENT;
} catch (Exception anyException) {
LOGGER.error("ERROR SENDING MESSAGE:", anyException);
getLocalReActorSystem().logError("Unable to send message {}", message, anyException);
return DeliveryStatus.NOT_SENT;
}
}

@Override
public CompletionStage<Try<Void>> cleanDriverLoop() {
Optional.ofNullable(localAppender.get()).ifPresent(ExcerptAppender::close);
return CompletableFuture.completedFuture(Try.ofRunnable(() -> Objects.requireNonNull(chronicle).close()));
}

private void chronicleMainLoop(ExcerptTailer tailer) {
var waitForNextMsg = Pauser.balanced();
try(DocumentContext documentContext = tailer.readingDocument()) {
var deserializer = new CQDeserializer();
ReadMarshallable reader = wireIn -> {
deserializer.setDeserializerInput(wireIn);
readMessage(deserializer);
};
while (!Thread.currentThread().isInterrupted()) {
try {
tailer.readDocument(reader);
if (documentContext.isPresent()) {
readMessage(deserializer);
waitForNextMsg.reset();
} else {
waitForNextMsg.pause();
}
}
catch (Exception anyException) {
LOGGER.error("Unable to decode data", anyException);
var deserializer = new CQDeserializer();
while (!Thread.currentThread().isInterrupted()) {
try(var ctx = tailer.readingDocument(false)) {
if (!ctx.isPresent()) {
waitForNextMsg.pause();
} else {
deserializer.setDeserializerInput(Objects.requireNonNull(ctx.wire()));
readMessage(deserializer);
waitForNextMsg.reset();
}
}
catch (Exception anyException) {
LOGGER.error("Unable to decode data", anyException);
}
}
}

Expand All @@ -164,12 +160,10 @@ void writeMessage(Serializer out, ReActorRef source, ReActorRef destination, lon
}

public static ReActorRef readReActorRef(Deserializer in) {
ReActorId reActorId = new ReActorId();
reActorId.decode(in);
ReActorSystemRef reActorSystemRef = new ReActorSystemRef();
reActorSystemRef.decode(in);
return new ReActorRef(reActorId, reActorSystemRef)
.setHashCode(Objects.hash(reActorId, reActorSystemRef));
var ref = new ReActorRef();
ref.decode(in);
ref.setHashCode(Objects.hash(ref.getReActorId(), ref.getReActorSystemRef()));
return ref;
}
public static ReActorSystemId readReActorSystemId(Deserializer in) {
ReActorSystemId reActorSystemId = new ReActorSystemId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
Expand All @@ -37,10 +38,11 @@

@NonNullByDefault
public class CQRemoteDriver extends RemotingDriver<CQRemoteDriverConfig> {
private final ThreadLocal<Serializer> serializerThreadLocal = ThreadLocal.withInitial(() -> null);

private final ThreadLocal<CQSerializer> localSerializer = ThreadLocal.withInitial(CQSerializer::new);
@Nullable
private ChronicleQueue chronicle;

private final ThreadLocal<ExcerptAppender> localAppender = ThreadLocal.withInitial(() -> chronicle.createAppender());
@Nullable
private ExcerptTailer cqTailer;

Expand Down Expand Up @@ -91,14 +93,11 @@ public ChannelId getChannelId() {
DeliveryStatus sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination,
long seqNum, ReActorSystemId reActorSystemId,
AckingPolicy ackingPolicy, PayloadT message) {
Serializer serializer = serializerThreadLocal.get();
if (serializer == null) {
serializer = new CQSerializer(Objects.requireNonNull(chronicle.acquireAppender()
.wire()));
serializerThreadLocal.set(serializer);
CQSerializer serializer = localSerializer.get();
try(var ctx = localAppender.get().acquireWritingDocument(false)) {
serializer.setSerializerOutput(Objects.requireNonNull(ctx.wire()));
return sendMessage(getLocalReActorSystem(), serializer, source, destination, seqNum, ackingPolicy, message);
}
return sendMessage(getLocalReActorSystem(), serializer,
source, destination, seqNum, ackingPolicy, message);
}

private void cqRemoteDriverMainLoop(ExcerptTailer cqTailer, ChronicleQueue chronicle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
public class CQSerializer implements Serializer {

private WireOut out;

CQSerializer(WireOut out) {setSerializerOutput(out);}

CQSerializer setSerializerOutput(WireOut out) {
this.out = out;
return this;
Expand Down

0 comments on commit 09d958e

Please sign in to comment.