Skip to content
Paul Louth edited this page Aug 19, 2016 · 13 revisions

What's the Actor model?

The LanguageExt Process system uses a theory of computation called the Actor Model.

  • An actor is a single threaded process
  • It has its own blob of state that only it can see and update
  • It has a message queue (inbox)
  • It processes the messages one at a time (single threaded remember)
  • When a message comes in, it can change its state; when the next message arrives it gets that modifiied state.
  • It has a parent Actor
  • It can spawn child Actors
  • It can tell messages to other Actors
  • It can ask for replies from other Actors
  • They're very lightweight, you can create 10,000s of them no problem

So you have a little bundle of self contained computation, attached to a blob of private state, that can get messages telling it to do stuff with its private state. Sounds like OO right? Well, it is, but as Alan Kay envisioned it. The slight difference with this is that it enforces execution order, and therefore there's no shared state access, and no race conditions (within the actor).

Distributed

The messages being sent to actors can also travel between machines, so now we have distributed processes too. This is how to send a message from one process to another on the same machine using LanguageExt.Process:

    tell(processId, "Hello, World!");

Now this is how to send a message from one process to another on a different machine:

    tell(processId, "Hello, World!");

It's the same. Decoupled, thread-safe, without race conditions. What's not to like?

How?

Sometimes this stuff is just easier by example, so here's a quick example, it spawns three processes, one logging process, one that sends a 'ping' message and one that sends a 'pong' message. They schedule the delivery of messages every 100 ms. The logger is simply: Console.WriteLine:

    // Log process
    var logger = spawn<string>("logger", Console.WriteLine);

    // Ping process
    ping = spawn<string>("ping", msg =>
    {
        tell(logger, msg);
        tell(pong, "ping", TimeSpan.FromMilliseconds(100));
    });

    // Pong process
    pong = spawn<string>("pong", msg =>
    {
        tell(logger, msg);
        tell(ping, "pong", TimeSpan.FromMilliseconds(100));
    });

    // Trigger
    tell(pong, "start");

Purely functional programming without the actor model at some point needs to deal with the world, and therefore needs statefullness. So you end up with imperative semantics in your functional expressions (unless you use Haskell).

Now you could go the Haskell route, but I think there's something quite perfect about having a bag of state that you run expressions on as messages come in. Essentially it's a fold over a stream.

There are lots of Actor systems out there, so what makes this any different? Obviously I wanted to create some familiarity, so the differences aren't huge, but they exist. The things that I felt I was missing from other Actor systems was that they didn't seem to acknowledge anything outside of their system. Now I know that the Actor model is supposed to be a self contained thing, and that's where its power lies, but in the real world you often need to get information out of it and you need to interact with existing code: declaring another class to receive a message was getting a little tedious. So what I've done is:

  • Remove the need to declare new classes for processes (actors)
  • Added a publish system to the processes
  • Made process discovery simple
  • Made a 'functional first' API

Functions if you want them

If your process is stateless then you just provide an Action<TMsg> to handle the messages, if your process is stateful then you provide a Func<TState> setup function, and a Func<TState,TMsg, TState> to handle the messages (any seasoned functional programmer will notice that is the signature of a fold). This makes it easy to create new processes and reduces the cognitive overload of having loads of classes for what should be small packets of computation.

You still need to create classes for messages and the like, that's unavoidable (Use F# to create a 'messages' project, it's much quicker and easier). But also, it's desirable, because it's the messages that define the interface and the interaction, not the processing class.

Creating something to log string messages to the console is as easy as:

    ProcessId log = spawn<string>("logger", Console.WriteLine);

    tell(log, "Hello, World");

Or if you want a stateful, thread-safe cache:

static class Cache
{
    enum Tag
    {
        Add,
        Remove,
        Get,
        Flush
    }

    class Msg
    {
        public Tag Tag;
        public string Key;
        public ExpiringValue Value;
    }

    class ExpiringValue
    {
        public DateTime Expiry;
        public string Value;
    }

    public static Unit Add(ProcessId pid, string key, string value) =>
        tell(pid, new Msg { Tag = Tag.Add, Key = key, Value = new ExpiringValue { Value = value, Expiry = DateTime.UtcNow.AddMinutes(1) }});

    public static Unit Remove(ProcessId pid, string key) =>
        tell(pid, new Msg { Tag = Tag.Remove, Key = key });

    public static string Get(ProcessId pid, string key) =>
        ask<V>(pid, new Msg { Tag = Tag.Get, Key = key });

    public static Unit Flush(ProcessId pid) =>
        tell(pid, new Msg { Tag = Tag.Flush });

    public static ProcessId Spawn(ProcessName name) =>
        // Argument 1 is the name of the process
        // Argument 2 is the setup function: returns a new empty cache (Map)
        // Argument 3 checks the message type and updates the state except when
        //            it's a 'Get' in which case it Finds the cache item and if
        //            it exists, calls 'reply', and then returns the state 
        //            untouched.
        spawn<Map<string, ExpiringValue>, Msg>(
            name,
            () => Map<string, ExpiringValue>(),
            (state, msg) => 
                match(msg.Tag,
                    with(Tag.Add,    _ => state.AddOrUpdate(msg.Key, msg.Value)),
                    with(Tag.Remove, _ => state.Remove(msg.Key)),
                    with(Tag.Get,    _ => state.Find(msg.Key).Map(v => v.Value).IfSome(reply).Return(state)),
                    with(Tag.Flush,  _ => state.Filter(s => s.Expiry < DateTime.UtcNow))));
}

The ProcessId is just a wrapped string path, so you can serialise it and pass it around, then anything can find and communicate with your cache:

    var pid = Cache.Spawn("my-cache");
    
    // Add a new item to the cache
    Cache.Add(pid, "test", "hello, world");
    
    // Get an item from the cache
    var thing = Cache.Get(pid, "test");

    // Remove an item from the cache
    Cache.Remove(pid, "test");

Periodically you will probably want to flush the cache contents. Just fire up another process, they're basically free (and by using functions rather than classes, very easy to put into little worker modules):

    public void SpawnCacheFlush(ProcessId cache)
    {
        // Spawns a process that tells the cache process to flush, and then sends
        // itself a message in 10 minutes which causes it to run again.
        
        var flush = spawn<Unit>(
            "cache-flush", _ =>
            {
                Cache.Flush(cache);
                tellSelf(unit, TimeSpan.FromMinutes(10));
            });

        // Start the process running
        tell(flush, unit); 
    }

Classes if you want them

For those that actually prefer the class based approach - or would at least prefer the class based approach for the larger/more-complex processes then there is an interface proxy system. The previous Cache example where there's quite bit of boiler-plate because of C#'s lack of discriminated unions and pattern-matching could be implemented thus:

    public interface ICache
    {
        void Add(string key, string value);
        void Remove(string key);
        string Get(string key);
        void Flush();
    }

    public class Cache : ICache
    {
        Map<string, ExpiringValue> state = Map.empty<string, ExpiringValue>();
        
        public void Add(string key, string value, DateTime expires)
        {
            state = state.Add(key, new ExpiringValue(value, expires));
        }
        
        public void Remove(string key)
        {
            state = state.Remove(key);
        }
        
        public string Get(string key)
        {
            return state[key];
        }
        
        public void Flush()
        {
            state = state.Filter(s => s.Expiry < DateTime.UtcNow);
        }
    }

Use it like so:

    // Spawn the Cache process with a state-type of Cache - it accepts the ProxyMsg
    // type for messages which are auto-unpacked and used to invoke methods on the
    // Cache state object.
    ProcessId pid = spawn<Cache>("cache");
    
    // Generate an ICache proxy for calling the methods on Cache.  The proxy function
    // maps the interface onto tell and ask calls, and packs up the method dispatch
    // requests into ProxyMsgs.  It also does a type-check to make sure the Process
    // actually has a state-type of ICache.
    ICache cache = proxy<ICache>(pid);

    // Call the ICache.Add method.  This is translated into a Process.tell 
    cache.Add("test", "hello, world", DateTime.UtcNow.AddMinutes(10));
    
    // Get an item from the cache.  This is translated into a Process.ask
    var thing = cache.Get("test");

    // Remove an item from the cache
    cache.Remove("test");

You could continue to use a stand-alone flush process, but it would need to use the proxy to communicate:

    var flush = spawn<Unit>(
        "cache-flush", _ =>
        {
            proxy<ICache>(pid).Flush();
            tellSelf(unit, TimeSpan.FromMinutes(10));
        });

The proxy can be built from anywhere, the Process system will auto-generate a concrete implementation for the interface that will dispatch to the Process specified. It also type checks your interface against what the actual Process is running adding an extra bit of type-safety to the procedure.

If you only need to work with the Process locally, then you can short-cut and go straight to the proxy:

    ICache cache = spawn<ICache>("cache", () => new Cache());

With the proxy approach we are back in the imperative world. But in some circumstances it is more valuable. If you imagine that each method on ICache is actually an inbox handler, you'll realise we still have the protection of single-threaded access and so the mutable nature of the Process state isn't the concern it would be if it was a regular class.

As you can see that's a pretty powerful technique. Remember the process could be running on another machine, and as long as the messages serialise you can talk to them by process ID or via proxy.

Publish system

Most other actor systems expect you to tell all messages directly to other actors. If you want a pub-sub model then you're required to create a publisher actor that can take subscription messages from other actors; the publisher actor then manages a 'registry' of subscribers to deliver messages to. It's all a bit bulky and unnecessary.

So with LanguageExt.Process each process manages its own internal subscriber list. If a process needs to announce something it calls:

    // Publish a message for anyone listening
    publish(msg);

Another process can subscribe to that by calling:

    subscribe(processId);

(The subscriber can do this in its setup phase, and the process system will auto-unsub when the process dies, and auto-resub when it restarts)

This means the messages that are published by one process can be consumed by any number of others (via their inbox in the normal way).

However, sometimes you want to jump outside of that system. For example, if your code is outside of the process system, it can get an IObservable stream instead:

    var sub = observe<Thing>(processId).Subscribe(msg => ...);

A good example of this is the 'Dead Letters' process, it gets all the messages that failed for one reason or another (serialisation problems, the process doesn't exist, the process crashed, etc.). All it does is call publish(msg), which allows you to subscribe to it for logging purposes. This is how it's defined:

    var deadLetters = spawn<DeadLetter>("dead-letters",publish);

That's it! For a key piece of infrastructure. So it's then possible to easily listen and log issues, or hook it up to a process that persists the dead letter messages.

'Discoverability'

Being able to find other Processes in a cluster (or within the same AppDomain) and dispatch or route to them is essential. There's a supervision hierarchy, where you have a root process, then a child user process under which you create your processes, and in turn they create child processes creating a tree structure with which you can use to route messages locally.

There's also system process under root that handles stuff like dead-letters and various other housekeeping tasks.

    /root/user/...
    /root/system/dead-letters
    etc.

When you create a Redis cluster connection the second argument is the name of the node in the 'cluster' (i.e. the name of the app/service/website, whatever it is). The third argument is the role of the node in the cluster (see Role.Broadcast, Role.LeastBusy, Role.Random, Role.RoundRobin, Role.First - for cluster dispatch methods). There is a static property Process.ClusterNodes that allows you to interrogate the nodes are online and what their role is.

    RedisCluster.register();
    ProcessConfig.initialise("sys", "web-front-end", "web-front-end-1", "localhost", "0");
  • "sys" is the 'system name', but easier to think of it as the name of the cluster as a whole. That means you can call it with a different value and point it at another Redis db for multiple clusters. But for now it's easier to call it sys and leave it.
  • "web-front-end" is the role, you can have multiple nodes in a role and the role based dispatchers allow you to implement high-availability and load balancing strategies.
  • "web-front-end-1" is the name of this node, and should be unique in the cluster
  • "localhost" is the Redis connection (can be comma separated for multiple Redis nodes)
  • "0" is the Redis catalogue to use ("0" - "15")

Then instead of having root as the top level Process in your hierarchy, you have my-stuff:

    /web-front-end-1/user/...
    /web-front-end-1/system/dead-letters

Therefore you know where things are, and what they're called, and they're easily addressable. You can just 'tell' the address:

    tell("/web-front-end-1/user/hello", "Hello!");

Or you can use the ProcessId API to build the path:

   ProcessId a = "/web-front-end-1/user/hello";
   ProcessId b = ProcessId.Top["web-front-end-1"]["user"]["hello"];
   // a == b

Even that isn't great if you don't know what the name of the 'app' that is running a Process. So processes can register by a single name, that goes into a 'shared namespace'. It's a kind of DNS for processes:

    /disp/reg/<name>

To register:

    register(myProcessId, "hello-world");

This goes in:

    /disp/reg/hello-world

Your process now has two addresses, the /my-stuff/user/hello-world address and the /disp/reg/hello-world address that anyone can find by calling find("hello-world"). This makes it very simple to bootstrap processes and get messages to them even if you don't know what system is actually dealing with it:

    tell(find("hello-world"), "Hi!");

Along with routers, dispatchers and roles the ability to find, route and dispatch to other nodes in the cluster is trivial. For a full discussion on routing, roles and dispatchers see here

Persistence

There is an ICluster interface that you can use the implement your own persistence layer. However out of the box there is persistence to Redis (using LanguageExt.Process.Redis).

You can optionally persist:

  • Inbox messages
  • Process state

Here's an example of persisting the inbox:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistInbox);

Here's an example of persisting the state:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistState);

Here's an example of persisting both:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistAll);

Process system documentation