-
Notifications
You must be signed in to change notification settings - Fork 43
Async
This is an ongoing collection of ideas on how to make SpanJson async.
- Not one large buffer for reading/writing, instead async reading/writing in one or smaller buffers and
- byref types like JsonReader/JsonWriter can't be used in async methods
- Expression Trees don't support async await
- Strings, Lists have dynamic length and might require multiple flushes / multiple reads
- Reading Strings might require resizing of the read buffer, so all parts of the string fit into memory or use something like
ReadOnlySequence
I used ListFormatter as an example on how to make the library async as lists have a dynamic length and might require flushing the buffer multiple times.
/// <summary>
/// Writes the list elements
/// Idea:
/// - Write everything synchronously until our buffer is full (enough)
/// - Flush it and either continue synchronously, if the flush is completed already, or asynchronously with the remaining elements
/// </summary>
public ValueTask SerializeAsync(AsyncWriter<TSymbol> asyncWriter, TList value, int nestingLimit, CancellationToken cancellationToken = default)
{
var maxSafeWriteSize = asyncWriter.MaxSafeWriteSize;
var writer = asyncWriter.Create();
if (value == null)
{
writer.WriteNull();
return new ValueTask(asyncWriter.FlushAsync(writer.Position, cancellationToken));
}
var nextNestingLimit = RecursionCandidate<T>.IsRecursionCandidate ? nestingLimit + 1 : nestingLimit;
var valueLength = value.Count;
writer.WriteBeginArray();
if (valueLength > 0)
{
// For now assume that this doesn't overflow the buffer, the same sync/async approach will apply here
SerializeRuntimeDecisionInternal<T, TSymbol, TResolver>(ref writer, value[0], ElementFormatter, nextNestingLimit);
for (var i = 1; i < valueLength; i++)
{
writer.WriteValueSeparator();
SerializeRuntimeDecisionInternal<T, TSymbol, TResolver>(ref writer, value[i], ElementFormatter, nextNestingLimit);
if (writer.Position > maxSafeWriteSize)
{
var task = asyncWriter.FlushAsync(writer.Position, cancellationToken);
if (task.IsCompleted) // if it is sync we reset the writer position and continue synchronously with the next value
{
writer.Reset();
}
else
{
// It's async, we need to call the async wrapper method and continue from there
// this stackoverflows for large sizes
return SerializeArrayElementsAsync(asyncWriter, task, value, i + 1, nextNestingLimit, cancellationToken);
}
}
}
}
writer.WriteEndArray();
return new ValueTask(asyncWriter.FlushAsync(writer.Position, cancellationToken));
}
/// <summary>
/// awaits the async task and continues writing the remaining list elements
/// </summary>
private async ValueTask SerializeArrayElementsAsync(AsyncWriter<TSymbol> asyncWriter, Task task, TList value, int nextIndex, int nestingLimit,
CancellationToken cancellationToken = default)
{
await task.ConfigureAwait(false);
await SerializeArrayElementsAsync(asyncWriter, value, nextIndex, nestingLimit, cancellationToken).ConfigureAwait(false); // call the
}
/// <summary>
/// Writes the remaining list elements
/// </summary>
private ValueTask SerializeArrayElementsAsync(AsyncWriter<TSymbol> asyncWriter, TList value, int nextIndex, int nestingLimit,
CancellationToken cancellationToken = default)
{
var maxSafeWriteSize = asyncWriter.MaxSafeWriteSize;
var writer = asyncWriter.Create();
var valueLength = value.Count;
for (var i = nextIndex; i < valueLength; i++)
{
writer.WriteValueSeparator();
SerializeRuntimeDecisionInternal<T, TSymbol, TResolver>(ref writer, value[i], ElementFormatter, nestingLimit);
if (writer.Position > maxSafeWriteSize)
{
var task = asyncWriter.FlushAsync(writer.Position, cancellationToken);
if (task.IsCompleted) // if it is sync we reset the writer position and continue synchronously with the next value
{
writer.Reset();
}
else
{
// It's async, we need to call the async wrapper method and continue from there
// this stackoverflows for large sizes
return SerializeArrayElementsAsync(asyncWriter, task, value, i + 1, nestingLimit, cancellationToken);
}
}
}
writer.WriteEndArray();
return new ValueTask(asyncWriter.FlushAsync(writer.Position, cancellationToken));
}
The above concept, e.g. recursively calling the async flush method and continuing the serialization with the remaining list values will result in StackOverflow for large lists in combination with smaller buffers.
Performance Writing everything synchronously, i.e. every flush is finished synchronously, is pretty much as fast as the normal sync method (there is practically no difference in codeflow anyway.) Asynchronously is roughly 25% slower.
This version circumvents the StackOverflow by not doing recursive calls, but instead having custom writer logic for the structural json parts (BeginArray, EndArray, etc.).
/// <summary>
/// Writes the list elements
/// </summary>
public async ValueTask SerializeAsync(AsyncWriter<TSymbol> asyncWriter, TList value, int nestingLimit, CancellationToken cancellationToken = default)
{
if (value == null)
{
asyncWriter.WriteNull();
await asyncWriter.FlushAsync(cancellationToken);
}
var nextNestingLimit = RecursionCandidate<T>.IsRecursionCandidate ? nestingLimit + 1 : nestingLimit;
var valueLength = value.Count;
asyncWriter.WriteBeginArray();
if (valueLength > 0)
{
asyncWriter.Position = WriteElementSync(asyncWriter, value[0], nextNestingLimit);
for (var i = 1; i < valueLength; i++)
{
asyncWriter.WriteValueSeparator();
asyncWriter.Position = WriteElementSync(asyncWriter, value[i], nextNestingLimit);
if (asyncWriter.Position > asyncWriter.MaxSafeWriteSize)
{
await asyncWriter.FlushAsync(cancellationToken);
}
}
asyncWriter.WriteEndArray();
}
await asyncWriter.FlushAsync(cancellationToken);
}
private int WriteElementSync(AsyncWriter<TSymbol> asyncWriter, T value, int nextNestingLimit)
{
var writer = asyncWriter.Create();
SerializeRuntimeDecisionInternal<T, TSymbol, TResolver>(ref writer, value, ElementFormatter, nextNestingLimit);
return writer.Position;
}
The above version requires a copy of the jsonwriter/reader logic for all structural elements and probably a few other elements too (e.g. string).
Performance This concept is already ~25-50% slower for the synchronous version, the async version is close to twice as slow.