Skip to content

Commit

Permalink
Fixing main core with the new de/serialization abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
wireknight committed May 11, 2023
1 parent c7810d4 commit 371439c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class CQDeserializer implements Deserializer {
private WireIn input;

public CQDeserializer(WireIn input) { setDeserializerInput(input); }
public CQDeserializer() { }

CQDeserializer setDeserializerInput(WireIn input) { this.input = input; return this; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ReadMarshallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,9 +122,14 @@ public CompletionStage<Try<Void>> cleanDriverLoop() {
private void chronicleMainLoop(ExcerptTailer tailer) {
var waitForNextMsg = Pauser.balanced();
try(DocumentContext documentContext = tailer.readingDocument()) {
Deserializer deserializer = new CQDeserializer(Objects.requireNonNull(documentContext.wire()));
var deserializer = new CQDeserializer();
ReadMarshallable reader = wireIn -> {
deserializer.setDeserializerInput(wireIn);
readMessage(deserializer);
};
while (!Thread.currentThread().isInterrupted()) {
try {
tailer.readDocument(reader);
if (documentContext.isPresent()) {
readMessage(deserializer);
waitForNextMsg.reset();
Expand Down Expand Up @@ -154,7 +160,7 @@ void writeMessage(Serializer out, ReActorRef source, ReActorRef destination, lon
out.put(seqNum);
localReActorSystemId.encode(out);
out.putEnum(ackingPolicy);
out.putObject(payload);
payload.encode(out);
}

public static ReActorRef readReActorRef(Deserializer in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ReadMarshallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -116,11 +117,15 @@ private void replayerMainLoop(ReActorSystem localReActorSystem, ChronicleQueue c
Map<ReActorId, Map<Long, Message>> dstToMessageBySeqNum = new HashMap<>();
Pauser pauser = Pauser.balanced();
try(DocumentContext documentContext = chronicleReader.readingDocument()) {
Deserializer deserializer = new CQDeserializer(Objects.requireNonNull(documentContext.wire()));
var deserializer = new CQDeserializer();
ReadMarshallable reader = wireIn -> {
deserializer.setDeserializerInput(wireIn);
readMessage(deserializer, localReActorSystem, emptyMap, dstToMessageBySeqNum);
};
while (!Thread.currentThread()
.isInterrupted() && !chronicle.isClosed()) {
try {
if (documentContext.isPresent()) {
if (chronicleReader.readDocument(reader)
readMessage(deserializer, localReActorSystem, emptyMap, dstToMessageBySeqNum);
pauser.reset();
} else {
Expand Down

0 comments on commit 371439c

Please sign in to comment.