Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sylvan csv benchmark #2

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/FileIO/FileIO.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<PackageReference Include="CsvHelper" Version="27.1.0" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
<PackageReference Include="System.Linq.Async" Version="5.0.0" />
<PackageReference Include="Sylvan.Data.Csv" Version="1.0.3" />
</ItemGroup>

</Project>
23 changes: 20 additions & 3 deletions src/FileIO/WithCsvHelperLib.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
Expand All @@ -9,12 +10,28 @@ namespace FileIO
public class WithCsvHelperLib
{

public IEnumerable<Employee> ProcessFileAsync(string filePath)
public IEnumerable<Employee> ProcessFileAsync(string filePath)
{
using var reader = new StreamReader(filePath);

Employee[] employees = new Employee[100000];
using var csv = new CsvReader(reader, CultureInfo.InvariantCulture);
var records = csv.GetRecords<Employee>();
return records.ToList();
int idx = 0;
csv.Read();
while (csv.Read())
{
var emp = new Employee
{
Name = csv[0],
Email = csv[1],
DateOfJoining = DateTime.Parse(csv[2]),
Salary = double.Parse(csv[3]),
Age = int.Parse(csv[4]),
};
employees[idx++] = emp;

}
return employees;

}
}
Expand Down
122 changes: 42 additions & 80 deletions src/FileIO/WithPipeLines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public class WithPipeLines
/// <returns>PipeReader Sequence Position</returns>
public async Task<int> ProcessFileAsync(string filePath, Employee[] employeeRecords)
{
const int BufferSize = 0x10000;
var position = 0;
if (!File.Exists(filePath)) return position;
await using var fileStream = File.OpenRead(filePath);
var pipeReader = PipeReader.Create(fileStream);
await using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, BufferSize);
var pipeReader = PipeReader.Create(fileStream, new StreamPipeReaderOptions(bufferSize: BufferSize));
while (true)
{
var fileData = await pipeReader.ReadAsync();
Expand All @@ -47,103 +47,65 @@ public async Task<int> ProcessFileAsync(string filePath, Employee[] employeeReco
private static SequencePosition ParseLines(Employee[] employeeRecords, in ReadOnlySequence<byte> buffer, ref int position)
{
var reader = new SequenceReader<byte>(buffer);
ReadOnlySpan<byte> line;

// skip the header row
reader.TryReadTo(out line, (byte)'\n', true);
// Read the whole line till the new line is found
while (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n', true))
while (reader.TryReadTo(out line, (byte)'\n', true))
{
var employee = LineParser.ParseLine(line); // we have a line to parse

if (employee is { }) // if the returned value is valid Employee object
employeeRecords[position++] = employee.Value;
employeeRecords[position++] = employee;
}

return reader.Position; // returning the Last position of the reader
}

private static class LineParser
{
private const byte Coma = (byte)',';
private static readonly byte[] ColumnHeaders = Encoding.UTF8.GetBytes("Name,Email,DateOfJoining,Salary,Age");
private const byte Comma = (byte)',';

public static Employee? ParseLine(ReadOnlySpan<byte> line)
public static Employee ParseLine(ReadOnlySpan<byte> line)
{
// REVIEW: There are better ways to do this
if (line.IndexOf(ColumnHeaders) >= 0) // Ignore the Header row
{
return null;
}

// Trim \r (if it exists)
line = line.TrimEnd((byte)'\r');

var fieldCount = 1;

var record = new Employee();

while (fieldCount <= 5) // we have five fields in csv file
var idx = line.IndexOf(Comma);

record.Name = Encoding.UTF8.GetString(line[..idx]);
line = line[(idx + 1)..];
idx = line.IndexOf(Comma);
record.Email = Encoding.UTF8.GetString(line[..idx]);
line = line[(idx + 1)..];
idx = line.IndexOf(Comma);

// stand on our heads to avoid allocating a temp string to parse the date
var buffer = line[..idx];
Span<char> chars = stackalloc char[buffer.Length];
for (int i = 0; i < buffer.Length; i++)
{
chars[i] = (char)buffer[i];
}
if (DateTime.TryParse(chars, out var doj))
{
record.DateOfJoining = doj;
}
line = line[(idx + 1)..];
idx = line.IndexOf(Comma);

if (Utf8Parser.TryParse(line[..idx], out double salary, out _))
{
record.Salary = salary;
}

line = line[(idx + 1)..];

if (Utf8Parser.TryParse(line, out short age, out _))
{
var comaAt = line.IndexOf(Coma);
if (comaAt < 0) // No more comas are found we have reached the last field.
{
comaAt = line.Length;
}

switch (fieldCount)
{
case 1:
{
var value = Encoding.UTF8.GetString(line[..comaAt]);
record.Name = value;
break;
}
case 2:
{
var value = Encoding.UTF8.GetString(line[..comaAt]);
record.Email = value;
break;
}
case 3:
{
var buffer = line[..comaAt];
if (DateTime.TryParse(Encoding.UTF8.GetString(line[..comaAt]), out var doj))

{
record.DateOfJoining = doj;
}
// Can't use Utf8 parser to extract datetime field because csv format doesn't have time
//https://docs.microsoft.com/en-us/dotnet/api/system.buffers.text.utf8parser.tryparse?view=net-5.0#System_Buffers_Text_Utf8Parser_TryParse_System_ReadOnlySpan_System_Byte__System_DateTime__System_Int32__System_Char_

// if (Utf8Parser.TryParse(buffer, out DateTime value, out var bytesConsumed))
// {
// record.DateOfJoining = value;
// }
break;
}

case 4:
{
var buffer = line[..comaAt];
if (Utf8Parser.TryParse(buffer, out double value, out var bytesConsumed))
{
record.Salary = value;
}
break;
}

case 5:
{
var buffer = line[..comaAt];
if (Utf8Parser.TryParse(buffer, out short value, out var bytesConsumed))
{
record.Age = value;
}
return record;
}
}

line = line[(comaAt + 1)..]; // slice past field

fieldCount++;
record.Age = age;
}

return record;
Expand Down
57 changes: 57 additions & 0 deletions src/FileIO/WithSylvanLib.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.IO;
using Sylvan.Data.Csv;
using System.Buffers;
using System.Threading.Tasks;
using System.Text;

namespace FileIO
{
public class WithSylvanLib
{
public void ProcessFile(string filePath, Employee[] employeeRecords)
{
const int BufferSize = 0x10000;
using var reader = new StreamReader(filePath, Encoding.UTF8, false, BufferSize);

char[] buffer = ArrayPool<char>.Shared.Rent(BufferSize);

using var csv = CsvDataReader.Create(reader, new CsvDataReaderOptions { Buffer = buffer });
int idx = 0;
while (csv.Read())
{
employeeRecords[idx++] = new Employee
{
Name = csv.GetString(0),
Email = csv.GetString(1),
DateOfJoining = csv.GetDateTime(2),
Salary = csv.GetDouble(3),
Age = csv.GetInt32(4),
};
}
ArrayPool<char>.Shared.Return(buffer);
}

public async Task ProcessFileAsync(string filePath, Employee[] employeeRecords)
{
const int BufferSize = 0x10000;
using var reader = new StreamReader(filePath, Encoding.UTF8, false, BufferSize);

char[] buffer = ArrayPool<char>.Shared.Rent(BufferSize);

await using var csv = await CsvDataReader.CreateAsync(reader, new CsvDataReaderOptions { Buffer = buffer });
int idx = 0;
while (await csv.ReadAsync())
{
employeeRecords[idx++] = new Employee
{
Name = csv.GetString(0),
Email = csv.GetString(1),
DateOfJoining = csv.GetDateTime(2),
Salary = csv.GetDouble(3),
Age = csv.GetInt32(4),
};
}
ArrayPool<char>.Shared.Return(buffer);
}
}
}
57 changes: 40 additions & 17 deletions tests/FileIO.Benchmarks/FileIOTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,11 @@ namespace FileIO.Benchmarks
[RankColumn()]
public class FileIOTest
{
private string _filePath;
[GlobalSetup]
public void Setup()
{
var directoryPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(Program))?.Location);
_filePath = Path.Combine(directoryPath ?? string.Empty, "Employees.csv");
}
private string _filePath = "Employees.csv";

[Benchmark]
public async Task PipeLines()
{
// var directoryPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(Program))?.Location);
//_filePath = Path.Combine(directoryPath ?? string.Empty, "Employees.csv");
var pool = ArrayPool<Employee>.Shared;
var employeeRecords = pool.Rent(100000);
var pipeLinesTest = new WithPipeLines();
Expand All @@ -41,22 +34,52 @@ public async Task PipeLines()

[Benchmark]
public async Task<IList<Employee>> AsyncStream()
{
// var directoryPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(Program))?.Location);
// _filePath = Path.Combine(directoryPath ?? string.Empty, "Employees.csv");
{
var asyncStream = new WithAsyncStreams();
var employees = await asyncStream.ProcessStreamAsync(_filePath);
return employees;
var employees = await asyncStream.ProcessStreamAsync(_filePath);
return employees;
}

[Benchmark]
public void CsvHelper()
{
// var directoryPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(Program))?.Location);
//_filePath = Path.Combine(directoryPath ?? string.Empty, "Employees.csv");
var csvHelper = new WithCsvHelperLib();
var employeesList = csvHelper.ProcessFileAsync(_filePath);

}

[Benchmark]
public void Sylvan()
{
var sylv = new WithSylvanLib();
var pool = ArrayPool<Employee>.Shared;
var employeeRecords = pool.Rent(100000);

try
{
sylv.ProcessFile(_filePath, employeeRecords);
}
finally
{
pool.Return(employeeRecords, true);
}
}

[Benchmark]
public async Task SylvanAsync()
{
var sylv = new WithSylvanLib();
var pool = ArrayPool<Employee>.Shared;
var employeeRecords = pool.Rent(100000);

try
{
await sylv.ProcessFileAsync(_filePath, employeeRecords);
}
finally
{
pool.Return(employeeRecords, true);
}
}
}
}
}