forked from akkadotnet/akka.net
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBatchWriterHandler.cs
110 lines (94 loc) · 3.8 KB
/
BatchWriterHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//-----------------------------------------------------------------------
// <copyright file="BatchWriter.cs" company="Akka.NET Project">
// Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using DotNetty.Buffers;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;
using DotNettyIRunnable = DotNetty.Common.Concurrency.IRunnable;
namespace Akka.Remote.Transport.DotNetty
{
/// <summary>
/// INTERNAL API.
///
/// Responsible for batching socket writes together into fewer sys calls to the socket.
/// </summary>
internal class BatchWriterHandler : ChannelHandlerAdapter
{
public readonly BatchWriterSettings Settings;
//internal bool CanSchedule { get; private set; } = true;
private IScheduledTask _scheduledTask;
public BatchWriterHandler(BatchWriterSettings settings)
{
Settings = settings;
}
private int _currentPendingWrites = 0;
private long _currentPendingBytes;
public bool HasPendingWrites => (uint)_currentPendingWrites > 0u;
public override void HandlerAdded(IChannelHandlerContext context)
{
ScheduleFlush(context); // only schedule flush operations when batching is enabled
base.HandlerAdded(context);
}
public override void Write(IChannelHandlerContext context, object message, IPromise promise)
{
/*
* Need to add the write to the rest of the pipeline first before we
* include it in the formula for determining whether or not we flush
* right now. The reason being is that if we did this the other way around,
* we could flush first before the write was in the "flushable" buffer and
* this can lead to "dangling writes" that never actually get transmitted
* across the network.
*/
base.Write(context, message, promise);
_currentPendingBytes += ((IByteBuffer)message).ReadableBytes;
_currentPendingWrites++;
if (_currentPendingWrites >= Settings.MaxPendingWrites
|| _currentPendingBytes >= Settings.MaxPendingBytes)
{
context.Flush();
Reset();
}
}
public override void Close(IChannelHandlerContext context, IPromise promise)
{
// flush any pending writes first
context.Flush();
_scheduledTask?.Cancel();
base.Close(context, promise);
}
private void ScheduleFlush(IChannelHandlerContext context)
{
// Schedule a recurring flush - only fires when there's writable data
var task = new FlushTask(context, this);
_scheduledTask = context.Executor.ScheduleAtFixedRate(task, Settings.FlushInterval, Settings.FlushInterval);
}
public void Reset()
{
_currentPendingWrites = 0;
_currentPendingBytes = 0;
}
class FlushTask : DotNettyIRunnable
{
private readonly IChannelHandlerContext _context;
private readonly BatchWriterHandler _writer;
public FlushTask(IChannelHandlerContext context, BatchWriterHandler writer)
{
_context = context;
_writer = writer;
}
public void Run()
{
if (_writer.HasPendingWrites)
{
// execute a flush operation
_context.Flush();
_writer.Reset();
}
}
}
}
}