Skip to content

Commit

Permalink
Merge branch 'ctx-rename'
Browse files Browse the repository at this point in the history
  • Loading branch information
wireknight committed Aug 1, 2023
2 parents 9546b96 + 371439c commit 00a0db3
Show file tree
Hide file tree
Showing 158 changed files with 2,408 additions and 1,661 deletions.
24 changes: 10 additions & 14 deletions core/src/main/java/io/reacted/core/config/ChannelId.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
package io.reacted.core.config;

import io.reacted.core.messages.SerializationUtils;
import io.reacted.core.serialization.Deserializer;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.core.serialization.Serializer;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;

import javax.annotation.concurrent.Immutable;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
Expand All @@ -27,7 +26,7 @@

@Immutable
@NonNullByDefault
public class ChannelId implements Externalizable {
public class ChannelId implements ReActedMessage {
public static final ChannelId NO_CHANNEL_ID = ChannelType.NULL_CHANNEL_TYPE.forChannelName("");
public static final ChannelId INVALID_CHANNEL_ID = ChannelType.INVALID_CHANNEL_TYPE
.forChannelName("INVALID CHANNEL NAME");
Expand Down Expand Up @@ -57,20 +56,17 @@ private ChannelId(ChannelType channelType, String channelName) {
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(channelType.ordinal());
//out.writeObject(channelType);
out.writeObject(channelName);
//out.writeInt(hashCode);
public void encode(Serializer serializer) {
serializer.put(channelType.ordinal());
serializer.put(channelName);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
setChannelType(ChannelType.forOrdinal(in.readInt()));
setChannelName((String)in.readObject());
public void decode(Deserializer deserializer) {
setChannelType(ChannelType.forOrdinal(deserializer.getInt()));
setChannelName(deserializer.getString());
setHashCode(Objects.hash(getChannelType(), getChannelName()));
}

public static Optional<ChannelId> fromToString(String inputString) {
return Try.of(() -> inputString.split(SEPARATOR))
.map(split -> new ChannelId(ChannelType.valueOf(split[0]), split[1]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;

import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -63,12 +63,12 @@ public Properties getChannelProperties() {
}

@Override
public <PayloadT extends Serializable> DeliveryStatus
public <PayloadT extends ReActedMessage> DeliveryStatus
sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination,
long seqNum, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) {
return destinationCtx.isStop()
? DeliveryStatus.NOT_DELIVERED
: localDeliver(destinationCtx, new Message(source, destination, seqNum, reActorSystemId,
ackingPolicy, message));
: localDeliver(destinationCtx,
Message.of(source, destination, seqNum, reActorSystemId, ackingPolicy, message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -76,7 +76,7 @@ public CompletionStage<Try<Void>> cleanDriverLoop() {
public Properties getChannelProperties() { return new Properties(); }

@Override
public <PayloadT extends Serializable> DeliveryStatus
public <PayloadT extends ReActedMessage> DeliveryStatus
sendMessage(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination, long seqNum,
ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) {
logFile.println(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;

import java.io.PrintWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Properties;
Expand Down Expand Up @@ -79,7 +79,7 @@ public CompletionStage<Try<Void>> cleanDriverLoop() {
public Properties getChannelProperties() { return new Properties(); }

@Override
public <PayloadT extends Serializable> DeliveryStatus
public <PayloadT extends ReActedMessage> DeliveryStatus
sendMessage(ReActorRef src, ReActorContext destinationCtx, ReActorRef destination, long seqNum, ReActorSystemId reActorSystemId,
AckingPolicy ackingPolicy, PayloadT message) {
synchronized (logFile) {
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/io/reacted/core/drivers/system/LocalDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.UnChecked.TriConsumer;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -34,40 +34,40 @@ protected LocalDriver(ConfigT config) {
}

@Override
public final <PayloadT extends Serializable>
public final <PayloadT extends ReActedMessage>
DeliveryStatus publish(ReActorRef src, ReActorRef dst, PayloadT message) {
throw new UnsupportedOperationException();
}

@Override
public final <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef src, ReActorRef dst,
@Nullable TriConsumer<ReActorId, Serializable, ReActorRef> propagateToSubscribers, PayloadT message) {
public final <PayloadT extends ReActedMessage> DeliveryStatus publish(ReActorRef src, ReActorRef dst,
@Nullable TriConsumer<ReActorId, ReActedMessage, ReActorRef> propagateToSubscribers, PayloadT message) {
throw new UnsupportedOperationException();
}

@Override
public <PayloadT extends Serializable> DeliveryStatus tell(ReActorRef src, ReActorRef dst, PayloadT message) {
public <PayloadT extends ReActedMessage> DeliveryStatus tell(ReActorRef src, ReActorRef dst, PayloadT message) {
throw new UnsupportedOperationException();
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT message) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT message) {
return CompletableFuture.failedStage(new UnsupportedOperationException());
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy,
TriConsumer<ReActorId, Serializable, ReActorRef> propagateToSubscribers, PayloadT message) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy,
TriConsumer<ReActorId, ReActedMessage, ReActorRef> propagateToSubscribers, PayloadT message) {
return CompletableFuture.failedStage(new UnsupportedOperationException());
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> atell(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT message) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> atell(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT message) {
return CompletableFuture.failedStage(new UnsupportedOperationException());
}

@Override
protected final <PayloadT extends Serializable> void
protected final <PayloadT extends ReActedMessage> void
offerMessage(ReActorRef source, ReActorRef destination, long sequenceNumber, ReActorSystemId fromReActorSystemId,
AckingPolicy ackingPolicy, PayloadT payload) {
ReActorId destinationId = destination.getReActorId();
Expand All @@ -91,7 +91,7 @@ public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> atell(ReA
}
}
}
protected static <PayloadT extends Serializable> DeliveryStatus
protected static <PayloadT extends ReActedMessage> DeliveryStatus
syncForwardMessageToLocalActor(ReActorRef source, ReActorContext destinationCtx, ReActorRef destination,
long sequenceNumber, ReActorSystemId fromReActorSystemId, AckingPolicy ackingPolicy,
PayloadT payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.serialization.ReActedMessage;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import io.reacted.patterns.UnChecked.TriConsumer;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -33,7 +33,7 @@
@NonNullByDefault
public class LoopbackDriver<ConfigT extends ChannelDriverConfig<?, ConfigT>> extends ReActorSystemDriver<ConfigT> {
private static final int SUBSCRIBERS_THRESHOLD_TO_USE_ASYNC_PROPAGATION = 6;
private final TriConsumer<ReActorId, Serializable, ReActorRef> propagateToSubscribers = this::propagateMessage;
private final TriConsumer<ReActorId, ReActedMessage, ReActorRef> propagateToSubscribers = this::propagateMessage;
private final LocalDriver<ConfigT> localDriver;
private final ReActorSystem localReActorSystem;
private final ExecutorService fanOutPool;
Expand All @@ -49,12 +49,13 @@ public LoopbackDriver(ReActorSystem reActorSystem, LocalDriver<ConfigT> localDri
}

@Override
public <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef src, ReActorRef dst, PayloadT payload) {
public <PayloadT extends ReActedMessage> DeliveryStatus publish(ReActorRef src, ReActorRef dst, PayloadT payload) {
return publish(src, dst, propagateToSubscribers, payload);
}
@Override
public <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef source, ReActorRef destination,
@Nullable TriConsumer<ReActorId, Serializable, ReActorRef> toSubscribers, PayloadT payload){
public <PayloadT extends ReActedMessage> DeliveryStatus publish(ReActorRef source, ReActorRef destination,
@Nullable TriConsumer<ReActorId, ReActedMessage, ReActorRef> toSubscribers,
PayloadT payload){
ReActorContext dstCtx = localReActorSystem.getReActorCtx(destination.getReActorId());
DeliveryStatus tellResult;
long seqNum = localReActorSystem.getNewSeqNum();
Expand All @@ -80,23 +81,23 @@ public <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef source,
}

@Override
public <PayloadT extends Serializable> DeliveryStatus tell(ReActorRef src, ReActorRef dst, PayloadT payload) {
public <PayloadT extends ReActedMessage> DeliveryStatus tell(ReActorRef src, ReActorRef dst, PayloadT payload) {
return publish(src, dst, DO_NOT_PROPAGATE, payload);
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT payload) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> apublish(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT payload) {
return apublish(src, dst, ackingPolicy, propagateToSubscribers, payload);
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> atell(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT payload) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> atell(ReActorRef src, ReActorRef dst, AckingPolicy ackingPolicy, PayloadT payload) {
return apublish(src, dst, ackingPolicy, DO_NOT_PROPAGATE, payload);
}

@Override
public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef source, ReActorRef destnation, AckingPolicy ackingPolicy,
TriConsumer<ReActorId, Serializable, ReActorRef> toSubscribers, PayloadT payload) {
public <PayloadT extends ReActedMessage> CompletionStage<DeliveryStatus> apublish(ReActorRef source, ReActorRef destnation, AckingPolicy ackingPolicy,
TriConsumer<ReActorId, ReActedMessage, ReActorRef> toSubscribers, PayloadT payload) {
ReActorContext destinationContext = localReActorSystem.getReActorCtx(destnation.getReActorId());
CompletionStage<DeliveryStatus> tellResult;
long seqNum = localReActorSystem.getNewSeqNum();
Expand Down Expand Up @@ -157,15 +158,15 @@ public UnChecked.CheckedRunnable getDriverLoop() {
public final ChannelId getChannelId() { return localDriver.getChannelId(); }

@Override
public <PayloadT extends Serializable> DeliveryStatus
public <PayloadT extends ReActedMessage> DeliveryStatus
sendMessage(ReActorRef src, ReActorContext destinationCtx, ReActorRef destination, long seqNum,
ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT message) {
throw new UnsupportedOperationException();
}
@Override
public Properties getChannelProperties() { return localDriver.getChannelProperties(); }

private void propagateMessage(ReActorId originalDst, Serializable msgPayload, ReActorRef src) {
private void propagateMessage(ReActorId originalDst, ReActedMessage msgPayload, ReActorRef src) {
var subscribers = localReActorSystem.getTypedSubscriptionsManager()
.getLocalSubscribers(msgPayload.getClass());
if (!subscribers.isEmpty()) {
Expand All @@ -180,7 +181,7 @@ private void propagateMessage(ReActorId originalDst, Serializable msgPayload, Re

private void propagateToSubscribers(LocalDriver<ConfigT> localDriver, List<ReActorContext> subscribers,
ReActorId originalDestination, ReActorSystem localReActorSystem,
ReActorRef source, Serializable payload) {
ReActorRef source, ReActedMessage payload) {
for (ReActorContext ctx : subscribers) {
if (!ctx.getSelf().getReActorId().equals(originalDestination)) {
localDriver.sendMessage(source, ctx, ctx.getSelf(), localReActorSystem.getNewSeqNum(),
Expand Down
Loading

0 comments on commit 00a0db3

Please sign in to comment.