Skip to content

Commit

Permalink
Using BufferedStream over NetworkStream for performance improvement. (m…
Browse files Browse the repository at this point in the history
…icrosoft#1041)

* Using BufferedStream over NetworkStream for performance improvement.

- Used BufferedStream over NetworkStream while writing the data to reduce the number of calls.
- Used BufferedSize of 16KB.

* Comments changed & Reverted some changes.

* Addressed PR comments.

* Adding test for the hang scenario.

* Moving tests from performance folder to platform tests.

* Resolved PR comments.
  • Loading branch information
navin22 authored Sep 7, 2017
1 parent 9d10144 commit cffd8a8
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.IO;
using System.Text;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
/// A communication channel using a length prefix packet frame for communication.
/// </summary>
public class LengthPrefixCommunicationChannel : ICommunicationChannel
{
private readonly Stream stream;

private readonly BinaryReader reader;

private readonly BinaryWriter writer;

public LengthPrefixCommunicationChannel(Stream stream)
{
this.stream = stream;
this.reader = new BinaryReader(stream, Encoding.UTF8, true);
this.writer = new BinaryWriter(stream, Encoding.UTF8, true);

// Using the Buffered stream while writing, improves the write performance. By reducing the number of writes.
this.writer = new BinaryWriter(new PlatformStream().CreateBufferedStream(stream, SocketConstants.BufferSize), Encoding.UTF8, true);
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand All @@ -35,7 +36,7 @@ protected SocketClient(Func<Stream, ICommunicationChannel> channelFactory)
this.cancellation = new CancellationTokenSource();
this.stopped = false;

this.tcpClient = new TcpClient();
this.tcpClient = new TcpClient { NoDelay = true };
this.channelFactory = channelFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;

/// <summary>
/// Facilitates communication using sockets
Expand Down Expand Up @@ -70,11 +70,6 @@ public class SocketCommunicationManager : ICommunicationManager
/// </summary>
private object sendSyncObject = new object();

/// <summary>
/// Stream to use read timeout
/// </summary>
private NetworkStream stream;

private Socket socket;

/// <summary>
Expand All @@ -100,7 +95,6 @@ internal SocketCommunicationManager(IDataSerializer dataSerializer)
public IPEndPoint HostServer(IPEndPoint endpoint)
{
this.tcpListener = new TcpListener(endpoint);

this.tcpListener.Start();
EqtTrace.Info("Listening on Endpoint : {0}", (IPEndPoint)this.tcpListener.LocalEndpoint);

Expand All @@ -119,13 +113,20 @@ public async Task AcceptClientAsync()

var client = await this.tcpListener.AcceptTcpClientAsync();
this.socket = client.Client;
this.stream = client.GetStream();
this.binaryReader = new BinaryReader(this.stream);
this.binaryWriter = new BinaryWriter(this.stream);
this.socket.NoDelay = true;

this.clientConnectedEvent.Set();
// Using Buffered stream only in case of write, and Network stream in case of read.
var bufferedStream = new PlatformStream().CreateBufferedStream(client.GetStream(), SocketConstants.BufferSize);
var networkStream = client.GetStream();
this.binaryReader = new BinaryReader(networkStream);
this.binaryWriter = new BinaryWriter(bufferedStream);

EqtTrace.Info("Accepted Client request and set the flag");
this.clientConnectedEvent.Set();
if (EqtTrace.IsInfoEnabled)
{
EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize);
EqtTrace.Info("Accepted Client request and set the flag");
}
}
}

Expand Down Expand Up @@ -165,7 +166,7 @@ public async Task SetupClientAsync(IPEndPoint endpoint)
// for now added a check for validation of this.tcpclient
this.clientConnectionAcceptedEvent.Reset();
EqtTrace.Info("Trying to connect to server on socket : {0} ", endpoint);
this.tcpClient = new TcpClient();
this.tcpClient = new TcpClient { NoDelay = true };
this.socket = this.tcpClient.Client;

Stopwatch watch = new Stopwatch();
Expand All @@ -178,10 +179,18 @@ public async Task SetupClientAsync(IPEndPoint endpoint)

if (this.tcpClient.Connected)
{
this.stream = this.tcpClient.GetStream();
this.binaryReader = new BinaryReader(this.stream);
this.binaryWriter = new BinaryWriter(this.stream);
EqtTrace.Info("Connected to the server successfully ");
// Using Buffered stream only in case of write, and Network stream in case of read.
var bufferedStream = new PlatformStream().CreateBufferedStream(this.tcpClient.GetStream(), SocketConstants.BufferSize);
var networkStream = this.tcpClient.GetStream();
this.binaryReader = new BinaryReader(networkStream);
this.binaryWriter = new BinaryWriter(bufferedStream);

if (EqtTrace.IsInfoEnabled)
{
EqtTrace.Info("Connected to the server successfully ");
EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize);
}

this.clientConnectionAcceptedEvent.Set();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
{
public class SocketConstants
{
// Buffer size for the buffered stream we are using.
public const int BufferSize = 16384;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand Down Expand Up @@ -88,10 +88,11 @@ public void Stop()
private void OnClientConnected(TcpClient client)
{
this.tcpClient = client;
this.tcpClient.Client.NoDelay = true;

if (this.ClientConnected != null)
{
this.channel = this.channelFactory(client.GetStream());
this.channel = this.channelFactory(this.tcpClient.GetStream());
this.ClientConnected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketServer: ClientConnected");

// Start the message loop
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces
{
using System.IO;

/// <summary>
/// Helper class to return plaform specific stream.
/// </summary>
public interface IStream
{
/// <summary>
/// Returns platrform specific Buffered Stream with desired buffer size.
/// </summary>
/// <param name="stream">Input Stream</param>
/// <param name="bufferSize">Buffer Size</param>
/// <returns>Buffered Stream</returns>
Stream CreateBufferedStream(Stream stream, int bufferSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
return new BufferedStream(stream, bufferSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System;
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
return stream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
namespace Microsoft.TestPlatform.CommunicationUtilities.PlatformTests
{
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Expand Down Expand Up @@ -264,9 +265,55 @@ public async Task ReceiveRawMessageAsyncShouldNotDeserializeThePayload()

Assert.AreEqual(DummyPayload, message);
}

#endregion

[TestMethod]
public void SocketPollShouldNotHangServerClientCommunication()
{
// Measure the throughput with socket communication v1 (SocketCommunicationManager)
// implementation.
var server = new SocketCommunicationManager();
var client = new SocketCommunicationManager();

int port = server.HostServer(new IPEndPoint(IPAddress.Loopback, 0)).Port;
client.SetupClientAsync(new IPEndPoint(IPAddress.Loopback, port)).Wait();
server.AcceptClientAsync().Wait();

server.WaitForClientConnection(1000);
client.WaitForServerConnection(1000);

var clientThread = new Thread(() => SendData(client));
clientThread.Start();

var dataReceived = 0;
while (dataReceived < 2048 * 5)
{
dataReceived += server.ReceiveRawMessageAsync(CancellationToken.None).Result.Length;
Task.Delay(1000).Wait();
}

clientThread.Join();

Assert.IsTrue(true);
}

private static void SendData(ICommunicationManager communicationManager)
{
// Having less than the buffer size in SocketConstants.BUFFERSIZE.
var dataBytes = new byte[2048];
for (int i = 0; i < dataBytes.Length; i++)
{
dataBytes[i] = 0x65;
}

var dataBytesStr = Encoding.UTF8.GetString(dataBytes);

for (int i = 0; i < 5; i++)
{
communicationManager.SendRawMessage(dataBytesStr);
}
}

private int StartServer()
{
this.tcpListener.Start();
Expand Down

0 comments on commit cffd8a8

Please sign in to comment.