Skip to content

Commit

Permalink
#7 add Process Manager host
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Mar 28, 2017
1 parent 2514ff5 commit 8b92e4f
Show file tree
Hide file tree
Showing 22 changed files with 403 additions and 51 deletions.
15 changes: 15 additions & 0 deletions MagazineWebAkka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{71DC390E-4
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cik.Magazine.Benchmark.Console", "src\Cik.Magazine.Benchmark.Console\Cik.Magazine.Benchmark.Console.csproj", "{D98E220B-165F-4A26-AC1D-847FDC4B1710}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cik.Magazine.ProcessManager", "src\Cik.Magazine.ProcessManager\Cik.Magazine.ProcessManager.csproj", "{854BE079-6DA2-4138-B707-6627315C46F6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -159,6 +161,18 @@ Global
{D98E220B-165F-4A26-AC1D-847FDC4B1710}.Release|x64.Build.0 = Release|Any CPU
{D98E220B-165F-4A26-AC1D-847FDC4B1710}.Release|x86.ActiveCfg = Release|Any CPU
{D98E220B-165F-4A26-AC1D-847FDC4B1710}.Release|x86.Build.0 = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|x64.ActiveCfg = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|x64.Build.0 = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|x86.ActiveCfg = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Debug|x86.Build.0 = Debug|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|Any CPU.Build.0 = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|x64.ActiveCfg = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|x64.Build.0 = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|x86.ActiveCfg = Release|Any CPU
{854BE079-6DA2-4138-B707-6627315C46F6}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -177,5 +191,6 @@ Global
{87302267-6FBF-4E28-ABC4-2FF832DAC72B} = {B705FA55-841C-4792-B357-0E0F7E7BBD2D}
{B078ECA6-201B-43B3-B474-E30856D37992} = {B705FA55-841C-4792-B357-0E0F7E7BBD2D}
{D98E220B-165F-4A26-AC1D-847FDC4B1710} = {B10EC218-4988-49EE-982C-8FE265BD1B12}
{854BE079-6DA2-4138-B707-6627315C46F6} = {B705FA55-841C-4792-B357-0E0F7E7BBD2D}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public async Task<IEnumerable<CategoryViewResponse>> GetAsync()
{
var result = await _categoryQuery.Ask<List<CategoryViewResponse>>(
new ListCategoryViewRequest());

return result;
}

Expand Down
3 changes: 1 addition & 2 deletions src/Cik.Magazine.ApiGateway/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void ConfigureServices(IServiceCollection services)
Props.Empty.WithRouter(FromConfig.Instance), "category-query-group");
_categoryCommanderActor = _systemActor.ActorOf(
Props.Empty.WithRouter(FromConfig.Instance), "category-commander-group");

services.AddSingleton<IActorRefFactory>(serviceProvider => _systemActor);

// GraphQL
Expand Down Expand Up @@ -140,7 +139,7 @@ private void OnShutdown()
{
_categoryQueryActor.Tell(PoisonPill.Instance);
_categoryCommanderActor.Tell(PoisonPill.Instance);
_systemActor.Terminate().Wait(5000);
_systemActor.Terminate();
}
}
}
7 changes: 4 additions & 3 deletions src/Cik.Magazine.ApiGateway/app.config
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
akka {
suppress-json-serializer-warning = on
loglevel = INFO
log-remote-lifecycle-events = INFO
serializers {
wire = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
Expand All @@ -24,8 +25,8 @@
/category-query-group {
router = round-robin-group
routees.paths = [
#"akka.tcp://magazine-system@localhost:8099/user/category-query",
"akka.tcp://magazine-system@localhost:8092/user/category-query"
#"akka.tcp://magazine-system@localhost:8099/user/category-query",
]
}
/category-commander-group {
Expand All @@ -52,10 +53,10 @@
hostname = localhost
}
}
log-remote-lifecycle-events = INFO
cluster {
seed-nodes = ["akka.tcp://magazine-system@localhost:8090"]
roles = [webapp]
auto-down-unreachable-after = 10s
roles = [webapi]
}
}
]]>
Expand Down
18 changes: 16 additions & 2 deletions src/Cik.Magazine.CategoryService/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@
}
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment {
/category-status-broadcaster-group {
router = consistent-hashing-group
routees.paths = ["akka.tcp://magazine-system@localhost:8099/user/category-status-broadcaster"]
#virtual-nodes-factor = 8
#cluster {
# enabled = on
# max-nr-of-instances-per-node = 2
# allow-local-routees = off
# use-role = pm
#}
}
}
debug {
receive = on
autoreceive = on
Expand All @@ -35,13 +48,14 @@
transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
applied-adapters = []
transport-protocol = tcp
port = 8092 # 8099
port = 8092 #8099
hostname = localhost
}
}
cluster {
seed-nodes = ["akka.tcp://magazine-system@localhost:8090"]
roles = [service]
auto-down-unreachable-after = 10s
roles = [category]
}
persistence {
publish-plugin-commands = on
Expand Down
1 change: 0 additions & 1 deletion src/Cik.Magazine.CategoryService/CategoryService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using Akka.Actor;
using Akka.Persistence.Fsm;
using Cik.Magazine.Shared.Messages.Category;
using MongoDB.Bson.Serialization;
using Serilog;
Expand Down
11 changes: 7 additions & 4 deletions src/Cik.Magazine.CategoryService/CategoryServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Cik.Magazine.CategoryService.Denomalizer.Projections;
using Cik.Magazine.CategoryService.Domain;
using Cik.Magazine.CategoryService.Query;
using Cik.Magazine.CategoryService.Sagas;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Domain;

Expand All @@ -18,7 +17,7 @@ public static IActorRef CategoryCommanderAggregate(this IActorRefFactory system,
{
var nameOfCommanderActor = SystemData.CategoryCommanderActor.Name;
var nameofProjectionActor = SystemData.CategoryProjectionsActor.Name;
var nameOfProcessManagerActor = "category-process-manager";
// var nameOfProcessManagerActor = "category-process-manager";

// build up the category actor
var projectionsProps = new ConsistentHashingPool(10)
Expand All @@ -27,10 +26,14 @@ public static IActorRef CategoryCommanderAggregate(this IActorRefFactory system,

/*var processManagerProps = new ConsistentHashingPool(1)
.Props(Props.Create(() => new CategoryProcessManager(id)));*/
var processManager = system.ActorOf(Props.Create(() => new CategoryProcessManager(id)));
// var processManager = system.ActorOf(Props.Create(() => new CategoryProcessManager(id)));
// var categoryStatusSaga = system.ActorSelection($"/user/{SystemData.CategoryStatusSagaActor.Name}-group");

var categoryStatusSagaActor = system.ActorOf(
Props.Empty.WithRouter(FromConfig.Instance), "category-status-broadcaster-group");

var creationParams = new AggregateRootCreationParameters(id, projections,
new HashSet<IActorRef>(new List<IActorRef> {processManager}), snapshotThreshold);
new HashSet<IActorRef>(new List<IActorRef> { categoryStatusSagaActor }), snapshotThreshold);
return system.ActorOf(Props.Create<Category>(creationParams), nameOfCommanderActor);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk" ToolsVersion="15.0">

<PropertyGroup Label="Configuration" Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<OutputType>exe</OutputType>
</PropertyGroup>
Expand Down Expand Up @@ -48,4 +49,5 @@
<HintPath>..\..\..\..\..\..\thangchung.HARVEYNASH\.nuget\packages\microsoft.extensions.dependencyinjection.abstractions\1.1.0\lib\netstandard1.0\Microsoft.Extensions.DependencyInjection.Abstractions.dll</HintPath>
</Reference>
</ItemGroup>

</Project>
18 changes: 14 additions & 4 deletions src/Cik.Magazine.CategoryService/Domain/Category.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Akka;
using System.Collections.Generic;
using Akka;
using Akka.Actor;
using Cik.Magazine.Shared;
using Cik.Magazine.Shared.Domain;

Expand All @@ -7,22 +9,30 @@ namespace Cik.Magazine.CategoryService.Domain
public class Category : AggregateRootActor
{
private CategoryState _state;
public ISet<ActorSelection> ActorSelections { get; }

public Category(AggregateRootCreationParameters parameters)
: base(parameters)
{
_state = new CategoryState {EventSink = this};

// register sagas
ActorSelections = new HashSet<ActorSelection>
{
Context.ActorSelection("/user/category-status-broadcaster-group")
};
}

protected override bool Handle(ICommand command)
{
_state.Handle(command);

// span out for all the process managers
foreach (var pm in ProcessManagers)
// span out to sagas
foreach (var pm in ActorSelections)
{
pm.Tell(command, Sender);
pm.Tell(command);
}

return true;
}

Expand Down
11 changes: 0 additions & 11 deletions src/Cik.Magazine.CategoryService/Query/CategoryQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using System.IO;
using Akka.Actor;
using Akka.Event;
using Cik.Magazine.CategoryService.Sagas;
using Cik.Magazine.Shared.Messages.Category;
using Cik.Magazine.Shared.Queries;
using MongoDB.Driver;

Expand Down Expand Up @@ -35,15 +33,6 @@ public void Handle(CategoryViewRequest message)

public void Handle(ListCategoryViewRequest message)
{
// var saga = Context.ActorOf(Props.Create(() => new CategoryProcessManager(new Guid("8f88d4f42e3c4a868b4667dfe5f97bea"))));
// saga.Tell(Shared.Messages.Category.Status.Reviewing);
// saga.Tell(new CreateCategory(Guid.NewGuid(), "cat1", Shared.Messages.Category.Status.Draft));

/* saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);
saga.Tell(Cik.Magazine.Shared.Messages.Category.Status.Reviewing);*/

_log.Info("Received message[{0}] and query data in NoSQL data source.", message.GetType().Name);
var db = _mongoClient.GetDatabase("magazine");
var col = db.GetCollection<CategoryViewResponse>("categories");
Expand Down
110 changes: 110 additions & 0 deletions src/Cik.Magazine.ProcessManager/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?xml version="1.0" encoding="utf-8"?>

<configuration>
<configSections>
<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
<akka>
<hocon>
<![CDATA[
akka {
suppress-json-serializer-warning = on
log-remote-lifecycle-events = INFO
loglevel = INFO
serializers {
wire = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
}
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
remote {
helios.tcp {
transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
applied-adapters = []
transport-protocol = tcp
port = 8099
hostname = localhost
}
}
cluster {
seed-nodes = ["akka.tcp://magazine-system@localhost:8090"]
auto-down-unreachable-after = 10s
roles = [pm]
}
persistence {
publish-plugin-commands = on
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = "mongodb://127.0.0.1:27017/magazine_store"
# should corresponding journal table's indexes be initialized automatically
auto-initialize = on
# dispatcher used to drive journal actor
plugin-dispatcher = "akka.actor.default-dispatcher"
# MongoDb collection corresponding with persistent journal
collection = "EventJournal"
# metadata collection
metadata-collection = "Metadata"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
# qualified type name of the MongoDB persistence snapshot actor
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = "mongodb://127.0.0.1:27017/magazine_store"
# should corresponding snapshot's indexes be initialized automatically
auto-initialize = on
# dispatcher used to drive snapshot storage actor
plugin-dispatcher = "akka.actor.default-dispatcher"
# MongoDb collection corresponding with persistent snapshot store
collection = "SnapshotStore"
}
}
}
}
]]>
</hocon>
</akka>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="System.Collections.Immutable" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.1.0" newVersion="1.2.1.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Runtime.InteropServices.RuntimeInformation" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.0.1.0" newVersion="4.0.1.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>
Loading

0 comments on commit 8b92e4f

Please sign in to comment.