Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrating MarketSubscribe events into custom Petabridge.Cmd palette #5

Merged
merged 2 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Akka.CQRS.Pricing.Cli/Akka.CQRS.Pricing.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.CQRS.Pricing.Subscriptions\Akka.CQRS.Pricing.Subscriptions.csproj" />
<ProjectReference Include="..\Akka.CQRS.Pricing\Akka.CQRS.Pricing.csproj" />
</ItemGroup>

Expand Down
33 changes: 23 additions & 10 deletions src/Akka.CQRS.Pricing.Cli/PriceCmdRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Actor;
using Akka.CQRS.Pricing.Commands;
using Akka.CQRS.Pricing.Views;
using Akka.Event;
using Petabridge.Cmd;
using Petabridge.Cmd.Host;

Expand All @@ -20,6 +21,7 @@ namespace Akka.CQRS.Pricing.Cli
/// </summary>
public sealed class PriceCmdRouter : CommandHandlerActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private IActorRef _priceViewMaster;

public PriceCmdRouter(IActorRef priceViewMaster) : base(PricingCmd.PricingCommandPalette)
Expand All @@ -38,21 +40,32 @@ public PriceCmdRouter(IActorRef priceViewMaster) : base(PricingCmd.PricingComman
Process(PricingCmd.PriceHistory.Name, (command, arguments) =>
{
var tickerSymbol = arguments.ArgumentValues("symbol").Single();
var getPriceTask = _priceViewMaster.Ask<PriceHistory>(new GetPriceHistory(tickerSymbol), TimeSpan.FromSeconds(5));
var getPriceTask = _priceViewMaster.Ask<PriceAndVolumeSnapshot>(new FetchPriceAndVolume(tickerSymbol), TimeSpan.FromSeconds(5));
var sender = Sender;

// pipe happy results back to the sender only on successful Ask
getPriceTask.ContinueWith(tr =>
{
return Enumerable.Select(tr.Result.HistoricalPrices, x => new CommandResponse(x.ToString(), false))
.Concat(new []{ CommandResponse.Empty });
}, TaskContinuationOptions.OnlyOnRanToCompletion).PipeTo(sender);

// pipe unhappy results back to sender on failure
getPriceTask.ContinueWith(tr =>
new ErroredCommandResponse($"Error while fetching price history for {tickerSymbol} - " +
$"timed out after 5s"), TaskContinuationOptions.NotOnRanToCompletion)
.PipeTo(sender); ;
try
{
if (tr.Result.PriceUpdates.Length == 0)
return new[]
{new CommandResponse($"No historical price data available for [{tr.Result.StockId}]")};

return Enumerable.Select(tr.Result.PriceUpdates, x => new CommandResponse(x.ToString(), false))
.Concat(new[] {CommandResponse.Empty});
}
catch (Exception ex)
{
_log.Error(ex, "Exception while returning price history for {0}", tickerSymbol);
return new[] {CommandResponse.Empty};
}

}).ContinueWith(tr =>
{
foreach(var r in tr.Result)
sender.Tell(r, ActorRefs.NoSender);
});
});
}
}
Expand Down
39 changes: 29 additions & 10 deletions src/Akka.CQRS.Pricing.Cli/PriceTrackingActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.CQRS.Pricing.Commands;
using Akka.CQRS.Pricing.Events;
using Akka.CQRS.Pricing.Views;
using Akka.CQRS.Pricing.Subscriptions;
using Petabridge.Cmd;

namespace Akka.CQRS.Pricing.Cli
Expand Down Expand Up @@ -36,33 +37,51 @@ public PriceTrackingActor(string tickerSymbol, IActorRef priceViewActor, IActorR

private void WaitingForPriceHistory()
{
Receive<PriceHistory>(p =>
Receive<PriceAndVolumeSnapshot>(p =>
{
if (p.HistoricalPrices.IsEmpty)
if (p.PriceUpdates.Length == 0)
{
_commandHandlerActor.Tell(new CommandResponse($"No historical price data for [{_tickerSymbol}] - waiting for updates.", false));
BecomePriceUpdates();
BecomeWaitingForSubscribe();
return;
}

_currentPrice = p.CurrentPriceUpdate;
foreach (var e in p.HistoricalPrices)
_currentPrice = p.PriceUpdates.Last();
foreach (var e in p.PriceUpdates)
{
_commandHandlerActor.Tell(new CommandResponse(e.ToString(), false));
}

BecomePriceUpdates();
BecomeWaitingForSubscribe();
});

Receive<ReceiveTimeout>(t =>
{
_commandHandlerActor.Tell(new CommandResponse($"No historical price data for [{_tickerSymbol}] - waiting for updates.", false));
BecomePriceUpdates();
BecomeWaitingForSubscribe();
});

ReceiveAny(_ => Stash.Stash());
}

private void BecomeWaitingForSubscribe()
{
_priceViewActor.Tell(new MarketSubscribe(_tickerSymbol, new[] { MarketEventType.PriceChange }, Self));
Become(WaitingForSubscribeAck);
}

private void WaitingForSubscribeAck()
{
Receive<MarketSubscribeAck>(ack => { BecomePriceUpdates(); });

Receive<ReceiveTimeout>(t =>
{
_commandHandlerActor.Tell(new CommandResponse($"Timed out while connecting to live price feed for [{_tickerSymbol}]. Retrying..."));
_priceViewActor.Tell(new MarketSubscribe(_tickerSymbol, new[] { MarketEventType.PriceChange }, Self));
BecomePriceUpdates();
});
}

private void BecomePriceUpdates()
{
Context.SetReceiveTimeout(null);
Expand All @@ -87,10 +106,10 @@ private void PriceUpdates()

protected override void PreStart()
{
var getlatestPrice = new GetLatestPrice(_tickerSymbol);
var getlatestPrice = new FetchPriceAndVolume(_tickerSymbol);

// get the historical price
_priceViewActor.Tell(new GetPriceHistory(_tickerSymbol));
_priceViewActor.Tell(getlatestPrice);
_priceCheckInterval = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(3), _priceViewActor, getlatestPrice, Self);

Expand Down
16 changes: 8 additions & 8 deletions src/Akka.CQRS.Pricing.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ static int Main(string[] args)

var actorSystem = ActorSystem.Create("AkkaTrader", conf.BootstrapFromDocker());

Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
var sharding = ClusterSharding.Get(actorSystem);
var sharding = ClusterSharding.Get(actorSystem);

var shardRegion = sharding.Start("priceAggregator",
s => Props.Create(() => new MatchAggregator(s)),
ClusterShardingSettings.Create(actorSystem),
new StockShardMsgRouter());
var shardRegion = sharding.Start("priceAggregator",
s => Props.Create(() => new MatchAggregator(s)),
ClusterShardingSettings.Create(actorSystem),
new StockShardMsgRouter());

Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
foreach (var ticker in AvailableTickerSymbols.Symbols)
{
shardRegion.Tell(new Ping(ticker));
Expand All @@ -81,7 +81,7 @@ void RegisterPalette(CommandPaletteHandler h)
RegisterPalette(ClusterCommands.Instance);
RegisterPalette(RemoteCommands.Instance);
RegisterPalette(ClusterShardingCommands.Instance);
//RegisterPalette(new PriceCommands(priceViewMaster));
RegisterPalette(new PriceCommands(shardRegion));
pbm.Start();

actorSystem.WhenTerminated.Wait();
Expand Down
5 changes: 5 additions & 0 deletions src/Akka.CQRS.Pricing/Events/VolumeChanged.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@ public int CompareTo(VolumeChanged other)
if (ReferenceEquals(null, other)) return 1;
return Timestamp.CompareTo(other.Timestamp);
}

public override string ToString()
{
return $"[{StockId}][{Timestamp}] - $[{CurrentVolume}]";
}
}
}