Skip to content

Commit

Permalink
#7 continue to implement the category saga
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Mar 25, 2017
1 parent 0ce71d3 commit 2514ff5
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/Cik.Magazine.CategoryService/CategoryService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Akka.Actor;
using Akka.Persistence.Fsm;
using Cik.Magazine.Shared.Messages.Category;
using MongoDB.Bson.Serialization;
using Serilog;
Expand Down
6 changes: 3 additions & 3 deletions src/Cik.Magazine.CategoryService/CategoryServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public static IActorRef CategoryCommanderAggregate(this IActorRefFactory system,
.Props(Props.Create<ReadModelProjections>());
var projections = system.ActorOf(projectionsProps, $"{nameofProjectionActor}-{nameOfCommanderActor}");

var processManagerProps = new ConsistentHashingPool(10)
.Props(Props.Create(() => new CategoryProcessManager(id)));
var processManager = system.ActorOf(processManagerProps, $"{nameOfProcessManagerActor}");
/*var processManagerProps = new ConsistentHashingPool(1)
.Props(Props.Create(() => new CategoryProcessManager(id)));*/
var processManager = system.ActorOf(Props.Create(() => new CategoryProcessManager(id)));

var creationParams = new AggregateRootCreationParameters(id, projections,
new HashSet<IActorRef>(new List<IActorRef> {processManager}), snapshotThreshold);
Expand Down
8 changes: 7 additions & 1 deletion src/Cik.Magazine.CategoryService/Domain/Category.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ public class Category : AggregateRootActor
public Category(AggregateRootCreationParameters parameters)
: base(parameters)
{
_state = new CategoryState {EventSink = this, ProcessManagers = ProcessManagers};
_state = new CategoryState {EventSink = this};
}

protected override bool Handle(ICommand command)
{
_state.Handle(command);

// span out for all the process managers
foreach (var pm in ProcessManagers)
{
pm.Tell(command, Sender);
}
return true;
}

Expand Down
9 changes: 1 addition & 8 deletions src/Cik.Magazine.CategoryService/Domain/CategoryState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System.Collections.Generic;
using Akka.Actor;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Domain;
using Cik.Magazine.Shared.Messages.Category;
using Status = Cik.Magazine.Shared.Messages.Category.Status;
Expand All @@ -12,7 +10,6 @@ public class CategoryState
public string Name { get; private set; }
public Status Status { get; private set; }
internal IEventSink EventSink { get; set; }
internal ISet<IActorRef> ProcessManagers { get; set; }

public void Handle(ICommand command)
{
Expand All @@ -27,10 +24,6 @@ public void Mutate(IEvent @event)
public void Handle(CreateCategory message)
{
EventSink.Publish(new CategoryCreated(message.AggregateId, message.Name, message.Status));
foreach (var pm in ProcessManagers)
{
pm.Tell(message.Status);
}
}

public void Handle(UpdateCategory message)
Expand Down
11 changes: 6 additions & 5 deletions src/Cik.Magazine.CategoryService/Query/CategoryQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.IO;
using Akka.Actor;
using Akka.Event;
using Cik.Magazine.CategoryService.Sagas;
using Cik.Magazine.Shared.Messages.Category;
using Cik.Magazine.Shared.Queries;
using MongoDB.Driver;

Expand Down Expand Up @@ -33,12 +35,11 @@ public void Handle(CategoryViewRequest message)

public void Handle(ListCategoryViewRequest message)
{
// var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager123(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"))));
// saga.Tell()
// var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"))));
// saga.Tell(Shared.Messages.Category.Status.Reviewing);
// saga.Tell(new CreateCategory(Guid.NewGuid(), "cat1", Shared.Messages.Category.Status.Draft));

/*var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager()));
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
/* saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);*/
Expand Down
123 changes: 90 additions & 33 deletions src/Cik.Magazine.CategoryService/Sagas/CategoryProcessManager.cs
Original file line number Diff line number Diff line change
@@ -1,94 +1,151 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Akka;
using Akka.Event;
using Akka.Persistence;
using Akka.Persistence.Fsm;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Messages.Category;
using Status = Cik.Magazine.Shared.Messages.Category.Status;
using MongoDB.Bson.Serialization;

namespace Cik.Magazine.CategoryService.Sagas
{
public class CategoryData
{
public Guid Id { get; set; }
public Status Status { get; set; }
public Status Status { get; set; }
}

public class CategoryProcessManager : PersistentFSM<Status, CategoryData, Event>
/// <summary>
/// CategoryProcessManager workflow for this case as
/// 1. [CategoryAggreation] submits CreateCategory command to [CategoryProcessManager] (status is Draft)
/// 2. [CategoryProcessManager] sends an email to [Admin] for an approval process (status is WaitingForApproval)
/// 2.1 This process will run periodic if [Admin] doesn't do the approval process (Quatz)
/// 3. [Admin] knows about it, then submits the ApproveCategory command to the [CategoryAggreation]
/// 4. [CategoryAggreation] updates itself status to Published and tells [CategoryProcessManager] about it
/// 5. [CategoryProcessManager] sends an email to notify with [Admin], and triggers CategoryApproved event for
/// persistence it into the storage
/// </summary>
public class CategoryProcessManager : PersistentFSM<Status, List<CategoryData>, Event>
{
private readonly List<CategoryData> _data = new List<CategoryData>();
private readonly Guid _id;
private readonly CategoryData _data = new CategoryData();
private readonly ILoggingAdapter _log;
private long LastSnapshottedVersion { get; set; }

public CategoryProcessManager(Guid id)
{
_id = id;
_log = Context.GetLogger();

StartWith(Status.Reviewing, _data);
When(Status.Reviewing, (e, state) =>
if (!BsonClassMap.IsClassMapRegistered(typeof(StateChangeEvent)))
BsonClassMap.RegisterClassMap<StateChangeEvent>();

StartWith(Status.Draft, _data);

When(Status.Draft, (e, state) =>
{
if (e.FsmEvent is CategoryCreated)
if (e.FsmEvent is CreateCategory)
{
var oldEvent = (CategoryCreated)e.FsmEvent;
return GoTo(Status.Published)
.Applying(new CategoryStatusUpdated(oldEvent.AggregateId, Status.Published));
var oldEvent = (CreateCategory) e.FsmEvent;
return GoTo(Status.WaitingForApproval)
.Applying(new CategoryStatusUpdated(oldEvent.AggregateId, Status.WaitingForApproval));
}

return state;
});
When(Status.Published, (e, state) =>

When(Status.Draft, (e, state) =>
{
// TODO: 1. handle e.FsmEvent is ApproveCategory
// TODO: 2. GoTo(Status.Published)
// TODO: 3. Applying(new CategoryApproved(oldEvent.AggregateId, Status.Published))

return state;
});

When(Status.Published, (e, state) => { return state; });
}

public override string PersistenceId => $"category-process-manager-{_id}";
private long LastSnapshottedVersion { get; set; }

protected override bool ReceiveCommand(object message)
{
return message.Match()
.With<IEvent>(@event =>
{
Persist(@event, e =>
{
});
}).WasHandled;
}
public override string PersistenceId => $"category-process-manager-{_id}";

protected override bool ReceiveRecover(object message)
{
return message.Match()
.With<CategoryStatusUpdated>(@event =>
{
_data.Id = @event.AggregateId;
_data.Status = @event.Status;
if (!_data.Any())
{
_data.Add(new CategoryData
{
Id = @event.AggregateId,
Status = @event.Status
});
}
else
{
var temps = _data.ConvertAll(x => x);
foreach (var categoryData in temps)
if (categoryData.Id.Equals(@event.AggregateId))
categoryData.Status = @event.Status;
else
_data.Add(new CategoryData
{
Id = @event.AggregateId,
Status = @event.Status
});
}
})
.With<RecoveryCompleted>(() =>
{
_log.Debug("[PM] Recovered state to version {0}", LastSequenceNr);
OnRecoveryCompleted();
})
.With<SnapshotOffer>(offer =>
{
LastSnapshottedVersion = offer.Metadata.SequenceNr;
}).WasHandled;
.With<SnapshotOffer>(offer => { LastSnapshottedVersion = offer.Metadata.SequenceNr; }).WasHandled;
}

protected override CategoryData ApplyEvent(Event e, CategoryData data)
protected override List<CategoryData> ApplyEvent(Event e, List<CategoryData> data)
{
if (e is CategoryStatusUpdated)
{
// TODO: send notification to sys-admin for approve the category
var evented = (CategoryStatusUpdated) e;
// TODO: refactor later
if (!_data.Any())
{
_data.Add(new CategoryData
{
Id = evented.AggregateId,
Status = evented.Status
});
}
else
{
var temps = _data.ConvertAll(x => x);
foreach (var categoryData in temps)
if (categoryData.Id.Equals(evented.AggregateId))
categoryData.Status = evented.Status;
else
_data.Add(new CategoryData
{
Id = evented.AggregateId,
Status = evented.Status
});
}

return data;
}
return data;
}

protected override void OnRecoveryCompleted()
{

foreach (var categoryData in _data)
if (categoryData.Status == Status.WaitingForApproval)
{
// TODO: [side-effects] send notification to sys-admin for approve the category
// TODO: e.g EmailActor.Tell(SendEmail())
}
}
}
}
24 changes: 8 additions & 16 deletions src/Cik.Magazine.CategoryService/Sagas/ExampleProcessManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ public ExampleProcessManager()
{
Id = Guid.NewGuid(),
Name = "Sample",
Status = Status.Reviewing
Status = Status.Draft
};

StartWith(Status.Reviewing, initData);
StartWith(Status.Draft, initData);

When(Status.Reviewing, @event =>
When(Status.Draft, @event =>
{
Console.WriteLine("Run Reviewing");
if (@event.StateData.Status == Status.Reviewing)
Console.WriteLine("Run Draft");
if (@event.StateData.Status == Status.Draft)
{
@event.StateData.Status = Status.Published;
return GoTo(Status.Published)
.Using(initData);
}
else
{
Console.WriteLine("Already Reviewed.");
}
Console.WriteLine("Already Drafted.");
return null;
}, TimeSpan.FromSeconds(5));

Expand All @@ -45,8 +42,8 @@ public ExampleProcessManager()
Console.WriteLine("Run Published");
if (@event.StateData.Status == Status.Published)
{
@event.StateData.Status = Status.Reviewing;
return GoTo(Status.Reviewing)
@event.StateData.Status = Status.Draft;
return GoTo(Status.Draft)
.Using(initData);
}
else
Expand All @@ -64,10 +61,5 @@ public ExampleProcessManager()
Initialize();

}

protected override bool Receive(object message)
{
return base.Receive(message);
}
}
}
3 changes: 2 additions & 1 deletion src/Cik.Magazine.Shared/Messages/Category/Status.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
{
public enum Status
{
Reviewing,
Draft,
WaitingForApproval,
Published
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public CategoryStatusUpdated(Guid aggregateId, Status status)
Status = status;
}

public Status Status { get; }
public Status Status { get; private set; }
}
}

0 comments on commit 2514ff5

Please sign in to comment.