Skip to content

Commit

Permalink
#7 continue to implement the Saga for review category status
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Mar 13, 2017
1 parent 3014360 commit 32e1e4b
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 55 deletions.
6 changes: 5 additions & 1 deletion src/Cik.Magazine.CategoryService/CategoryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,27 @@ public CategoryService(ILogger logger)
public bool Start(HostControl hostControl)
{
_logger.Information("Create an actor system, query and commander.");

GlobalActorSystem = ActorSystem.Create("magazine-system");
CategoryCommanderActor = GlobalActorSystem.CategoryCommanderAggregate(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"));
CategoryQueryActor = GlobalActorSystem.CategoryQueryAggregate();

// config for mongo
// TODO: need to scan all the events and map it to BsonClassMap
BsonClassMap.RegisterClassMap<CategoryCreated>();
BsonClassMap.RegisterClassMap<CategoryStatusUpdated>();

return true;
}

public bool Stop(HostControl hostControl)
{
_logger.Information("Release the actor system, query and commander resources.");
_logger.Information("Release the actor system, query, commander and process manager resources.");

CategoryCommanderActor.Tell(PoisonPill.Instance);
CategoryQueryActor.Tell(PoisonPill.Instance);
GlobalActorSystem.Terminate();

return true;
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/Cik.Magazine.CategoryService/CategoryServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Cik.Magazine.CategoryService.Denomalizer.Projections;
using Cik.Magazine.CategoryService.Domain;
using Cik.Magazine.CategoryService.Query;
using Cik.Magazine.CategoryService.Sagas;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Domain;

Expand All @@ -16,10 +17,17 @@ public static IActorRef CategoryCommanderAggregate(this IActorRefFactory system,
{
var nameOfCommanderActor = SystemData.CategoryCommanderActor.Name;
var nameofProjectionActor = SystemData.CategoryProjectionsActor.Name;
var nameOfProcessManagerActor = "category-process-manager";

// build up the category actor
var projectionsProps = new ConsistentHashingPool(10).Props(Props.Create<ReadModelProjections>());
var projections = system.ActorOf(projectionsProps, $"{nameofProjectionActor}-{nameOfCommanderActor}");
var creationParams = new AggregateRootCreationParameters(id, projections, snapshotThreshold);

// TODO: need to have a way to inject commander into process manager for trigger event back to commander
var processManagerProps = new ConsistentHashingPool(10).Props(Props.Create(() => new CategoryProcessManager(id, null)));
var processManager = system.ActorOf(processManagerProps, $"{nameOfProcessManagerActor}");

var creationParams = new AggregateRootCreationParameters(id, projections, processManager, snapshotThreshold);
return system.ActorOf(Props.Create<Category>(creationParams), nameOfCommanderActor);
}

Expand Down
11 changes: 10 additions & 1 deletion src/Cik.Magazine.CategoryService/Query/CategoryQuery.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Runtime.Remoting.Contexts;
using System.Threading;
using Akka.Actor;
using Akka.Event;
using Cik.Magazine.CategoryService.Sagas;
Expand Down Expand Up @@ -34,7 +35,15 @@ public void Handle(CategoryViewRequest message)

public void Handle(ListCategoryViewRequest message)
{
var saga = Context.ActorOf(Props.Create(() => new ReviewCategorySaga(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"))));
// var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager123(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"))));
// saga.Tell()

/*var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager()));
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);*/

_log.Info("Received message[{0}] and query data in NoSQL data source.", message.GetType().Name);
var db = _mongoClient.GetDatabase("magazine");
Expand Down
89 changes: 89 additions & 0 deletions src/Cik.Magazine.CategoryService/Sagas/CategoryProcessManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence;
using Akka.Persistence.Fsm;
using Cik.Magazine.CategoryService.Domain;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Messages.Category;
using Status = Cik.Magazine.Shared.Messages.Category.Status;

namespace Cik.Magazine.CategoryService.Sagas
{
public class CategoryProcessManager : PersistentFSM<Status, CategoryState, Event>
{
private readonly Guid _id;
private readonly IActorRef _commander;
private readonly CategoryState _state = new CategoryState();
private readonly ILoggingAdapter _log;
private long LastSnapshottedVersion { get; set; }

public CategoryProcessManager(Guid id, IActorRef commander)
{
_id = id;
_commander = commander;
_log = Context.GetLogger();

StartWith(Status.Reviewing, _state);
When(Status.Reviewing, (e, state) =>
{
if (e.FsmEvent is CategoryCreated)
{
var oldEvent = (CategoryCreated)e.FsmEvent;
return GoTo(Status.Published)
.Applying(new CategoryStatusUpdated(oldEvent.AggregateId, Status.Published));
}

return state;
});
When(Status.Published, (e, state) =>
{
// TODO: do the actions like send email to notify or something else
return state;
});
}

public override string PersistenceId => $"category-process-manager-{_id}";

protected override bool ReceiveCommand(object message)
{
return message.Match()
.With<IEvent>(@event =>
{
Persist(@event, e => { });
}).WasHandled;
}

protected override bool ReceiveRecover(object message)
{
return message.Match()
.With<CategoryCreated>(@event =>
{
_state.Apply(@event);
})
.With<RecoveryCompleted>(() =>
{
_log.Debug("[PM] Recovered state to version {0}", LastSequenceNr);
})
.With<SnapshotOffer>(offer =>
{
LastSnapshottedVersion = offer.Metadata.SequenceNr;
}).WasHandled;
}

protected override CategoryState ApplyEvent(Event e, CategoryState data)
{
if (e is CategoryStatusUpdated)
{
return data;
}
return data;
}

protected override void OnRecoveryCompleted()
{

}
}
}
73 changes: 73 additions & 0 deletions src/Cik.Magazine.CategoryService/Sagas/ExampleProcessManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using Akka.Actor;
using Status = Cik.Magazine.Shared.Messages.Category.Status;

namespace Cik.Magazine.CategoryService.Sagas
{
public class Data
{
public Guid Id { get; set; }
public string Name { get; set; }
public Status Status { get; set; }
}

public class ExampleProcessManager : FSM<Status, Data>
{
public ExampleProcessManager()
{
var initData = new Data
{
Id = Guid.NewGuid(),
Name = "Sample",
Status = Status.Reviewing
};

StartWith(Status.Reviewing, initData);

When(Status.Reviewing, @event =>
{
Console.WriteLine("Run Reviewing");
if (@event.StateData.Status == Status.Reviewing)
{
@event.StateData.Status = Status.Published;
return GoTo(Status.Published)
.Using(initData);
}
else
{
Console.WriteLine("Already Reviewed.");
}
return null;
}, TimeSpan.FromSeconds(5));

When(Status.Published, @event =>
{
Console.WriteLine("Run Published");
if (@event.StateData.Status == Status.Published)
{
@event.StateData.Status = Status.Reviewing;
return GoTo(Status.Reviewing)
.Using(initData);
}
else
{
Console.WriteLine("Change back to Review.");
}
return null;
}, TimeSpan.FromSeconds(5));

OnTransition((a, b) =>
{

});

Initialize();

}

protected override bool Receive(object message)
{
return base.Receive(message);
}
}
}
51 changes: 0 additions & 51 deletions src/Cik.Magazine.CategoryService/Sagas/ReviewCategorySaga.cs

This file was deleted.

7 changes: 6 additions & 1 deletion src/Cik.Magazine.Shared/Domain/AggregateRootActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ namespace Cik.Magazine.Shared.Domain
{
public class AggregateRootCreationParameters
{
public AggregateRootCreationParameters(Guid id, IActorRef projections, int snapshotThreshold = 250)
public AggregateRootCreationParameters(Guid id, IActorRef projections, IActorRef processManager, int snapshotThreshold = 250)
{
Id = id;
Projections = projections;
ProcessManager = processManager;
SnapshotThreshold = snapshotThreshold;
}

public Guid Id { get; }
public IActorRef Projections { get; }
public IActorRef ProcessManager { get; }
public int SnapshotThreshold { get; }
}

Expand All @@ -26,12 +28,14 @@ public abstract class AggregateRootActor : PersistentActor, IEventSink
private readonly Guid _id;
private readonly ILoggingAdapter _log;
private readonly IActorRef _projections;
private readonly IActorRef _processManager;
private readonly int _snapshotThreshold;

protected AggregateRootActor(AggregateRootCreationParameters parameters)
{
_id = parameters.Id;
_projections = parameters.Projections;
_processManager = parameters.ProcessManager;
_snapshotThreshold = parameters.SnapshotThreshold;

_log = Context.GetLogger();
Expand All @@ -47,6 +51,7 @@ void IEventSink.Publish(IEvent @event)
{
Apply(e);
_projections.Tell(@event);
_processManager.Tell(@event);
Self.Tell(SaveAggregate.Message); // save the snapshot if it is possible
});
}
Expand Down

0 comments on commit 32e1e4b

Please sign in to comment.