-
Notifications
You must be signed in to change notification settings - Fork 0
ask
is a request/response system for processes. You can ask a process a question (a message) and it can reply using the Process.reply
function. It doesn't have to and ask
will timeout after ActorConfig.Default.Timeout
seconds.
ask
is blocking, all messages ahead of you in the queue must be dealt with first and could cause ask
to block longer than you might hope if the Process
inbox is large. You should consider using tell
instead, it is fire and forget. If you need to know the result of an operation you can also consider Process.observe()
and Process.subscribe()
, they allow for subscription to anything that a Process
publishes via Process.publish()
. You can also subscribe to the state changes of a Process
using Process.observeState()
and Process.subscribeState()
.
class Cache<K, V>
{
enum Tag
{
Add,
Remove,
Get,
Flush
}
class Msg
{
public Tag Tag;
public K Key;
public ExpiringValue Value;
}
class ExpiringValue
{
public DateTime Expiry;
public V Value;
}
public static Unit Add(ProcessId pid, K key, V value) =>
tell(pid, new Msg { Tag = Tag.Add, Key = key, Value = new ExpiringValue { Value = value, Expiry = DateTime.UtcNow }});
public static Unit Remove(ProcessId pid, K key) =>
tell(pid, new Msg { Tag = Tag.Remove, Key = key });
public static V Get(ProcessId pid, K key) =>
ask<V>(pid, new Msg { Tag = Tag.Get, Key = key });
public static Unit Flush(ProcessId pid) =>
tell(pid, new Msg { Tag = Tag.Flush });
public static ProcessId Spawn(ProcessName name) =>
// Argument 1 is the name of the process
// Argument 2 is the setup function: returns a new empty cache (Map)
// Argument 3 checks the message type and updates the state except when
// it's a 'Get' in which case it Finds the cache item and if
// it exists, calls 'reply', and then returns the state
// untouched.
spawn<Map<K, ExpiringValue>, Msg>(
name,
() => Map<K, ExpiringValue>(),
(state, msg) =>
match(msg.Tag,
with(Tag.Add, _ => state.AddOrUpdate(msg.Key, msg.Value)),
with(Tag.Remove, _ => state.Remove(msg.Key)),
with(Tag.Get, _ => state.Find(msg.Key).Map(v => v.Value).IfSome(reply).Return(state)),
with(Tag.Flush, _ => state.Filter(s => s.Expiry < DateTime.Now))));
}