Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use generics in syncing code. #72929

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task TestAssetSynchronization()
var assetSource = new SimpleAssetSource(workspace.Services.GetService<ISerializerService>(), map);

var service = new AssetProvider(sessionId, storage, assetSource, remoteWorkspace.Services.GetService<ISerializerService>());
await service.SynchronizeAssetsAsync(AssetPath.FullLookupForTesting, new HashSet<Checksum>(map.Keys), results: null, CancellationToken.None);
await service.SynchronizeAssetsAsync<object>(AssetPath.FullLookupForTesting, new HashSet<Checksum>(map.Keys), results: null, CancellationToken.None);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<object> is used when the request is genuinely asking for heterogenous data.


foreach (var kv in map)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Workspaces/CoreTestUtilities/Fakes/SimpleAssetSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ namespace Microsoft.CodeAnalysis.Remote.Testing;
/// </summary>
internal sealed class SimpleAssetSource(ISerializerService serializerService, IReadOnlyDictionary<Checksum, object> map) : IAssetSource
{
public ValueTask<ImmutableArray<object>> GetAssetsAsync(
public ValueTask<ImmutableArray<T>> GetAssetsAsync<T>(
Checksum solutionChecksum, AssetPath assetPath, ReadOnlyMemory<Checksum> checksums, ISerializerService deserializerService, CancellationToken cancellationToken)
{
var results = new List<object>();
var results = new List<T>();

foreach (var checksum in checksums.Span)
{
Expand All @@ -39,7 +39,7 @@ public ValueTask<ImmutableArray<object>> GetAssetsAsync(
using var reader = ObjectReader.GetReader(stream, leaveOpen: true, cancellationToken);
var asset = deserializerService.Deserialize(data.GetWellKnownSynchronizationKind(), reader, cancellationToken);
Contract.ThrowIfNull(asset);
results.Add(asset);
results.Add((T)asset);
}

return ValueTaskFactory.FromResult(results.ToImmutableArray());
Expand Down
12 changes: 6 additions & 6 deletions src/Workspaces/Remote/Core/RemoteHostAssetSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Solut
}
}

public static ValueTask<ImmutableArray<object>> ReadDataAsync(
public static ValueTask<ImmutableArray<T>> ReadDataAsync<T>(
PipeReader pipeReader, Checksum solutionChecksum, int objectCount, ISerializerService serializerService, CancellationToken cancellationToken)
{
// Suppress ExecutionContext flow for asynchronous operations operate on the pipe. In addition to avoiding
Expand All @@ -78,17 +78,17 @@ public static ValueTask<ImmutableArray<object>> ReadDataAsync(
using var _ = FlowControlHelper.TrySuppressFlow();
return ReadDataSuppressedFlowAsync(pipeReader, solutionChecksum, objectCount, serializerService, cancellationToken);

static async ValueTask<ImmutableArray<object>> ReadDataSuppressedFlowAsync(
static async ValueTask<ImmutableArray<T>> ReadDataSuppressedFlowAsync(
PipeReader pipeReader, Checksum solutionChecksum, int objectCount, ISerializerService serializerService, CancellationToken cancellationToken)
{
using var stream = await pipeReader.AsPrebufferedStreamAsync(cancellationToken).ConfigureAwait(false);
return ReadData(stream, solutionChecksum, objectCount, serializerService, cancellationToken);
return ReadData<T>(stream, solutionChecksum, objectCount, serializerService, cancellationToken);
}
}

public static ImmutableArray<object> ReadData(Stream stream, Checksum solutionChecksum, int objectCount, ISerializerService serializerService, CancellationToken cancellationToken)
public static ImmutableArray<T> ReadData<T>(Stream stream, Checksum solutionChecksum, int objectCount, ISerializerService serializerService, CancellationToken cancellationToken)
{
using var _ = ArrayBuilder<object>.GetInstance(objectCount, out var results);
using var _ = ArrayBuilder<T>.GetInstance(objectCount, out var results);

using var reader = ObjectReader.GetReader(stream, leaveOpen: true, cancellationToken);

Expand All @@ -104,7 +104,7 @@ public static ImmutableArray<object> ReadData(Stream stream, Checksum solutionCh
// in service hub, cancellation means simply closed stream
var result = serializerService.Deserialize(kind, reader, cancellationToken);
Contract.ThrowIfNull(result);
results.Add(result);
results.Add((T)result);
}

return results.ToImmutableAndClear();
Expand Down
135 changes: 105 additions & 30 deletions src/Workspaces/Remote/ServiceHub/Host/AssetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -40,24 +39,24 @@ public override async ValueTask<T> GetAssetAsync<T>(
using var _1 = PooledHashSet<Checksum>.GetInstance(out var checksums);
checksums.Add(checksum);

using var _2 = PooledDictionary<Checksum, object>.GetInstance(out var results);
using var _2 = PooledDictionary<Checksum, T>.GetInstance(out var results);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method this is in is already generic. so this now allows that generic information to flow into SynchronizeAssetsAsync

await this.SynchronizeAssetsAsync(assetPath, checksums, results, cancellationToken).ConfigureAwait(false);

return (T)results[checksum];
return results[checksum];
}

public override async ValueTask<ImmutableArray<(Checksum checksum, T asset)>> GetAssetsAsync<T>(
AssetPath assetPath, HashSet<Checksum> checksums, CancellationToken cancellationToken)
{
using var _ = PooledDictionary<Checksum, object>.GetInstance(out var results);
using var _ = PooledDictionary<Checksum, T>.GetInstance(out var results);

await this.SynchronizeAssetsAsync(assetPath, checksums, results, cancellationToken).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method this is in is already generic. so this now allows that generic information to flow into SynchronizeAssetsAsync


var result = new (Checksum checksum, T asset)[checksums.Count];
var index = 0;
foreach (var (checksum, assetObject) in results)
{
result[index] = (checksum, (T)assetObject);
result[index] = (checksum, assetObject);
index++;
}

Expand All @@ -66,29 +65,57 @@ public override async ValueTask<T> GetAssetAsync<T>(

public async ValueTask SynchronizeSolutionAssetsAsync(Checksum solutionChecksum, CancellationToken cancellationToken)
{
var timer = new Stopwatch();
timer.Start();
var timer = SharedStopwatch.StartNew();

// this will pull in assets that belong to the given solution checksum to this remote host.
// this one is not supposed to be used for functionality but only for perf. that is why it doesn't return anything.
// to get actual data GetAssetAsync should be used. and that will return actual data and if there is any missing data in cache, GetAssetAsync
// itself will bring that data in from data source (VS)
// this will pull in assets that belong to the given solution checksum to this remote host. this one is not
// supposed to be used for functionality but only for perf. that is why it doesn't return anything. to get
// actual data GetAssetAsync should be used. and that will return actual data and if there is any missing data
// in cache, GetAssetAsync itself will bring that data in from data source (VS)

// one can call this method to make cache hot for all assets that belong to the solution checksum so that GetAssetAsync call will most likely cache hit.
// it is most likely since we might change cache hueristic in future which make data to live a lot shorter in the cache, and the data might get expired
// before one actually consume the data.
// one can call this method to make cache hot for all assets that belong to the solution checksum so that
// GetAssetAsync call will most likely cache hit. it is most likely since we might change cache heuristic in
// future which make data to live a lot shorter in the cache, and the data might get expired before one actually
// consume the data.
using (Logger.LogBlock(FunctionId.AssetService_SynchronizeSolutionAssetsAsync, Checksum.GetChecksumLogInfo, solutionChecksum, cancellationToken))
{
var syncer = new ChecksumSynchronizer(this);
await syncer.SynchronizeSolutionAssetsAsync(solutionChecksum, cancellationToken).ConfigureAwait(false);
await SynchronizeSolutionAssetsWorkerAsync().ConfigureAwait(false);
}

timer.Stop();

// report telemetry to help correlate slow solution sync with UI delays
if (timer.ElapsedMilliseconds > 1000)
var elapsed = timer.Elapsed;
if (elapsed.TotalMilliseconds > 1000)
Logger.Log(FunctionId.AssetService_Perf, KeyValueLogMessage.Create(map => map["SolutionSyncTime"] = elapsed.TotalMilliseconds));

async ValueTask SynchronizeSolutionAssetsWorkerAsync()
{
Logger.Log(FunctionId.AssetService_Perf, KeyValueLogMessage.Create(map => map["SolutionSyncTime"] = timer.ElapsedMilliseconds));
using var _1 = PooledDictionary<Checksum, object>.GetInstance(out var checksumToObjects);

// first, get top level solution state for the given solution checksum
var compilationStateChecksums = await this.GetAssetAsync<SolutionCompilationStateChecksums>(
assetPath: AssetPath.SolutionOnly, solutionChecksum, cancellationToken).ConfigureAwait(false);

using var _2 = PooledHashSet<Checksum>.GetInstance(out var checksums);

// second, get direct children of the solution compilation state.
compilationStateChecksums.AddAllTo(checksums);
await this.SynchronizeAssetsAsync<object>(assetPath: AssetPath.SolutionOnly, checksums, results: null, cancellationToken).ConfigureAwait(false);

// third, get direct children of the solution state.
var stateChecksums = await this.GetAssetAsync<SolutionStateChecksums>(
assetPath: AssetPath.SolutionOnly, compilationStateChecksums.SolutionState, cancellationToken).ConfigureAwait(false);

// Ask for solutions and top-level projects as the solution checksums will contain the checksums for
// the project states and we want to get that all in one batch.
checksums.Clear();
stateChecksums.AddAllTo(checksums);
await this.SynchronizeAssetsAsync(assetPath: AssetPath.SolutionAndTopLevelProjectsOnly, checksums, checksumToObjects, cancellationToken).ConfigureAwait(false);

// fourth, get all projects and documents in the solution
foreach (var (projectChecksum, _) in stateChecksums.Projects)
{
var projectStateChecksums = (ProjectStateChecksums)checksumToObjects[projectChecksum];
await SynchronizeProjectAssetsAsync(projectStateChecksums, cancellationToken).ConfigureAwait(false);
}
}
}

Expand All @@ -105,13 +132,61 @@ public async ValueTask SynchronizeProjectAssetsAsync(ProjectStateChecksums proje
// consume the data.
using (Logger.LogBlock(FunctionId.AssetService_SynchronizeProjectAssetsAsync, Checksum.GetProjectChecksumsLogInfo, projectChecksums, cancellationToken))
{
var syncer = new ChecksumSynchronizer(this);
await syncer.SynchronizeProjectAssetsAsync(projectChecksums, cancellationToken).ConfigureAwait(false);
await SynchronizeProjectAssetsWorkerAsync().ConfigureAwait(false);
}

async ValueTask SynchronizeProjectAssetsWorkerAsync()
{
// get children of project checksum objects at once
using var _ = PooledHashSet<Checksum>.GetInstance(out var checksums);

checksums.Add(projectChecksums.Info);
checksums.Add(projectChecksums.CompilationOptions);
checksums.Add(projectChecksums.ParseOptions);
AddAll(checksums, projectChecksums.ProjectReferences);
AddAll(checksums, projectChecksums.MetadataReferences);
AddAll(checksums, projectChecksums.AnalyzerReferences);
AddAll(checksums, projectChecksums.Documents.Checksums);
AddAll(checksums, projectChecksums.AdditionalDocuments.Checksums);
AddAll(checksums, projectChecksums.AnalyzerConfigDocuments.Checksums);

// First synchronize all the top-level info about this project.
await this.SynchronizeAssetsAsync<object>(
assetPath: AssetPath.ProjectAndDocuments(projectChecksums.ProjectId), checksums, results: null, cancellationToken).ConfigureAwait(false);

checksums.Clear();

// Then synchronize the info about all the documents within.
await CollectChecksumChildrenAsync(checksums, projectChecksums.Documents).ConfigureAwait(false);
await CollectChecksumChildrenAsync(checksums, projectChecksums.AdditionalDocuments).ConfigureAwait(false);
await CollectChecksumChildrenAsync(checksums, projectChecksums.AnalyzerConfigDocuments).ConfigureAwait(false);

await this.SynchronizeAssetsAsync<object>(
assetPath: AssetPath.ProjectAndDocuments(projectChecksums.ProjectId), checksums, results: null, cancellationToken).ConfigureAwait(false);
}

async ValueTask CollectChecksumChildrenAsync(HashSet<Checksum> checksums, ChecksumsAndIds<DocumentId> collection)
{
// This GetAssetsAsync call should be fast since they were just retrieved above. There's a small chance
// the asset-cache GC pass may have cleaned them up, but that should be exceedingly rare.
var allDocChecksums = await this.GetAssetsAsync<DocumentStateChecksums>(
AssetPath.ProjectAndDocuments(projectChecksums.ProjectId), collection.Checksums, cancellationToken).ConfigureAwait(false);
foreach (var docChecksums in allDocChecksums)
{
checksums.Add(docChecksums.Info);
checksums.Add(docChecksums.Text);
}
}

static void AddAll(HashSet<Checksum> checksums, ChecksumCollection checksumCollection)
{
foreach (var checksum in checksumCollection)
checksums.Add(checksum);
}
}

public async ValueTask SynchronizeAssetsAsync(
AssetPath assetPath, HashSet<Checksum> checksums, Dictionary<Checksum, object>? results, CancellationToken cancellationToken)
public async ValueTask SynchronizeAssetsAsync<T>(
AssetPath assetPath, HashSet<Checksum> checksums, Dictionary<Checksum, T>? results, CancellationToken cancellationToken)
{
Contract.ThrowIfTrue(checksums.Contains(Checksum.Null));
if (checksums.Count == 0)
Expand All @@ -134,7 +209,7 @@ public async ValueTask SynchronizeAssetsAsync(
missingChecksumsCount = 0;
foreach (var checksum in checksums)
{
if (_assetCache.TryGetAsset<object>(checksum, out var existing))
if (_assetCache.TryGetAsset<T>(checksum, out var existing))
{
AddResult(checksum, existing);
}
Expand Down Expand Up @@ -163,7 +238,7 @@ public async ValueTask SynchronizeAssetsAsync(
if (missingChecksumsCount > 0)
{
var missingChecksumsMemory = new ReadOnlyMemory<Checksum>(missingChecksums, 0, missingChecksumsCount);
var missingAssets = await RequestAssetsAsync(assetPath, missingChecksumsMemory, cancellationToken).ConfigureAwait(false);
var missingAssets = await RequestAssetsAsync<T>(assetPath, missingChecksumsMemory, cancellationToken).ConfigureAwait(false);

Contract.ThrowIfTrue(missingChecksumsMemory.Length != missingAssets.Length);

Expand All @@ -173,7 +248,7 @@ public async ValueTask SynchronizeAssetsAsync(
var missingAsset = missingAssets[i];

AddResult(missingChecksum, missingAsset);
_assetCache.GetOrAdd(missingChecksum, missingAsset);
_assetCache.GetOrAdd(missingChecksum, missingAsset!);
}
}

Expand All @@ -183,14 +258,14 @@ public async ValueTask SynchronizeAssetsAsync(

return;

void AddResult(Checksum checksum, object result)
void AddResult(Checksum checksum, T result)
{
if (results != null)
results[checksum] = result;
}
}

private async ValueTask<ImmutableArray<object>> RequestAssetsAsync(
private async ValueTask<ImmutableArray<T>> RequestAssetsAsync<T>(
AssetPath assetPath, ReadOnlyMemory<Checksum> checksums, CancellationToken cancellationToken)
{
#if NETCOREAPP
Expand All @@ -202,6 +277,6 @@ private async ValueTask<ImmutableArray<object>> RequestAssetsAsync(
if (checksums.Length == 0)
return [];

return await _assetSource.GetAssetsAsync(_solutionChecksum, assetPath, checksums, _serializerService, cancellationToken).ConfigureAwait(false);
return await _assetSource.GetAssetsAsync<T>(_solutionChecksum, assetPath, checksums, _serializerService, cancellationToken).ConfigureAwait(false);
}
}
Loading