Skip to content

Commit

Permalink
Revise connection settings (#58)
Browse files Browse the repository at this point in the history
* revisit the mechanism for specifying alternate page blob storage: use more descriptive names, and implement deletion

* fix missing code.

* fix bug

* fix checkpoint devices so they use page blob storage as well.

(cherry picked from commit 1ea23e69899798e61f54eeba9dfeacbac2d4c483)
  • Loading branch information
sebastianburckhardt authored Jun 24, 2021
1 parent fbe0454 commit 6621e9a
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ IPartitionState IStorageProvider.CreatePartitionState()
return new MemoryStorage(this.Logger);

case TransportConnectionString.StorageChoices.Faster:
return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.PremiumStorageConnectionName, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory);
return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.ResolvedPageBlobStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory);

default:
throw new NotImplementedException("no such storage choice");
Expand All @@ -203,7 +203,11 @@ async Task IStorageProvider.DeleteAllPartitionStatesAsync()
break;

case TransportConnectionString.StorageChoices.Faster:
await Faster.FasterStorage.DeleteTaskhubStorageAsync(this.Settings.ResolvedStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName).ConfigureAwait(false);
await Faster.FasterStorage.DeleteTaskhubStorageAsync(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedPageBlobStorageConnectionString,
this.Settings.UseLocalDirectoryForPartitionStorage,
this.Settings.HubName).ConfigureAwait(false);
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ public class NetheriteOrchestrationServiceSettings
public int PackPartitionTaskMessages { get; set; } = 100;

/// <summary>
/// Gets or sets the name used for resolving the premium Azure storage connection string, if used.
/// A name for resolving a storage connection string to be used specifically for the page blobs, or null if page blobs are to be stored in the default account.
/// </summary>
public string PremiumStorageConnectionName { get; set; } = null;
public string PageBlobStorageConnectionName { get; set; } = null;

/// <summary>
/// The resolved page blob storage connection string, or null if page blobs are to be stored in the default account. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
internal bool UsePremiumStorage => !string.IsNullOrEmpty(this.PremiumStorageConnectionName);
public string ResolvedPageBlobStorageConnectionString { get; set; }

[JsonIgnore]
internal bool UseSeparatePageBlobStorage => !string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString);

/// <summary>
/// A lower limit on the severity level of trace events emitted by the transport layer.
Expand Down Expand Up @@ -224,6 +230,17 @@ public void Validate(Func<string,string> connectionStringResolver)
}
}

if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString)
&& !string.IsNullOrEmpty(this.PageBlobStorageConnectionName))
{
this.ResolvedPageBlobStorageConnectionString = connectionStringResolver(this.PageBlobStorageConnectionName);

if (string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString))
{
throw new InvalidOperationException($"Could not resolve {nameof(this.PageBlobStorageConnectionName)}:{this.PageBlobStorageConnectionName} for Netherite storage provider.");
}
}

if (string.IsNullOrEmpty(this.ResolvedTransportConnectionString))
{
if (string.IsNullOrEmpty(this.EventHubsConnectionName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
readonly uint partitionId;
readonly CancellationTokenSource shutDownOrTermination;
readonly CloudStorageAccount cloudStorageAccount;
readonly CloudStorageAccount secondaryStorageAccount;
readonly CloudStorageAccount pageBlobAccount;

readonly CloudBlobContainer blockBlobContainer;
readonly CloudBlobContainer pageBlobContainer;
Expand Down Expand Up @@ -70,28 +70,28 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
internal const long HashTableSize = 1L << 14; // 16 k buckets, 1 GB
//internal const long HashTableSize = 1L << 14; // 8 M buckets, 512 GB

public FasterLogSettings EventLogSettings(bool usePremiumStorage) => new FasterLogSettings
public FasterLogSettings EventLogSettings(bool useSeparatePageBlobStorage) => new FasterLogSettings
{
LogDevice = this.EventLogDevice,
LogCommitManager = this.UseLocalFiles
? new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}")
: (ILogCommitManager)this,
PageSizeBits = 21, // 2MB
SegmentSizeBits =
usePremiumStorage ? 35 // 32 GB
: 30, // 1 GB
useSeparatePageBlobStorage ? 35 // 32 GB
: 30, // 1 GB
MemorySizeBits = 22, // 2MB
};

public LogSettings StoreLogSettings(bool usePremiumStorage, uint numPartitions) => new LogSettings
public LogSettings StoreLogSettings(bool useSeparatePageBlobStorage, uint numPartitions) => new LogSettings
{
LogDevice = this.HybridLogDevice,
ObjectLogDevice = this.ObjectLogDevice,
PageSizeBits = 17, // 128kB
MutableFraction = 0.9,
SegmentSizeBits =
usePremiumStorage ? 35 // 32 GB
: 32, // 4 GB
useSeparatePageBlobStorage ? 35 // 32 GB
: 32, // 4 GB
CopyReadsToTail = true,
MemorySizeBits =
(numPartitions <= 1) ? 25 : // 32MB
Expand Down Expand Up @@ -204,16 +204,16 @@ public static TimeSpan GetDelayBetweenRetries(int numAttempts)
=> TimeSpan.FromSeconds(Math.Pow(2, (numAttempts - 1)));

// For tests only; TODO consider adding PSFs
internal BlobManager(CloudStorageAccount storageAccount, CloudStorageAccount secondaryStorageAccount, string localFileDirectory, string taskHubName, ILogger logger, Microsoft.Extensions.Logging.LogLevel logLevelLimit, uint partitionId, IPartitionErrorHandler errorHandler)
: this(storageAccount, secondaryStorageAccount, localFileDirectory, taskHubName, logger, logLevelLimit, partitionId, errorHandler, 0)
internal BlobManager(CloudStorageAccount storageAccount, CloudStorageAccount pageBlobAccount, string localFileDirectory, string taskHubName, ILogger logger, Microsoft.Extensions.Logging.LogLevel logLevelLimit, uint partitionId, IPartitionErrorHandler errorHandler)
: this(storageAccount, pageBlobAccount, localFileDirectory, taskHubName, logger, logLevelLimit, partitionId, errorHandler, 0)
{
}

/// <summary>
/// Create a blob manager.
/// </summary>
/// <param name="storageAccount">The cloud storage account, or null if using local file paths</param>
/// <param name="secondaryStorageAccount">Optionally, a secondary cloud storage accounts</param>
/// <param name="pageBlobAccount">The storage account to use for page blobs</param>
/// <param name="localFilePath">The local file path, or null if using cloud storage</param>
/// <param name="taskHubName">The name of the taskhub</param>
/// <param name="logger">A logger for logging</param>
Expand All @@ -223,7 +223,7 @@ internal BlobManager(CloudStorageAccount storageAccount, CloudStorageAccount sec
/// <param name="psfGroupCount">Number of PSF groups to be created in FASTER</param>
public BlobManager(
CloudStorageAccount storageAccount,
CloudStorageAccount secondaryStorageAccount,
CloudStorageAccount pageBlobAccount,
string localFilePath,
string taskHubName,
ILogger logger,
Expand All @@ -232,7 +232,7 @@ public BlobManager(
int psfGroupCount)
{
this.cloudStorageAccount = storageAccount;
this.secondaryStorageAccount = secondaryStorageAccount;
this.pageBlobAccount = pageBlobAccount;
this.UseLocalFiles = (localFilePath != null);
this.LocalFileDirectoryForTestingAndDebugging = localFilePath;
this.ContainerName = GetContainerName(taskHubName);
Expand All @@ -244,8 +244,16 @@ public BlobManager(
{
CloudBlobClient serviceClient = this.cloudStorageAccount.CreateCloudBlobClient();
this.blockBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
serviceClient = this.secondaryStorageAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);

if (pageBlobAccount == storageAccount)
{
this.pageBlobContainer = this.BlockBlobContainer;
}
else
{
serviceClient = this.pageBlobAccount.CreateCloudBlobClient();
this.pageBlobContainer = serviceClient.GetContainerReference(this.ContainerName);
}
}
else
{
Expand Down Expand Up @@ -303,10 +311,18 @@ public async Task StartAsync()
else
{
await this.blockBlobContainer.CreateIfNotExistsAsync();
await this.pageBlobContainer.CreateIfNotExistsAsync();
this.pageBlobPartitionDirectory = this.pageBlobContainer.GetDirectoryReference(this.PartitionFolderName);
this.blockBlobPartitionDirectory = this.blockBlobContainer.GetDirectoryReference(this.PartitionFolderName);

if (this.pageBlobContainer == this.blockBlobContainer)
{
this.pageBlobPartitionDirectory = this.blockBlobPartitionDirectory;
}
else
{
await this.pageBlobContainer.CreateIfNotExistsAsync();
this.pageBlobPartitionDirectory = this.pageBlobContainer.GetDirectoryReference(this.PartitionFolderName);
}

this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(CommitBlobName);

AzureStorageDevice createDevice(string name) =>
Expand All @@ -317,8 +333,9 @@ AzureStorageDevice createDevice(string name) =>
var objectLogDevice = createDevice(ObjectLogBlobName);

var psfLogDevices = (from groupOrdinal in Enumerable.Range(0, this.PsfGroupCount)
let psfDirectory = this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
select new AzureStorageDevice(PsfHybridLogBlobName, psfDirectory.GetDirectoryReference(PsfHybridLogBlobName), psfDirectory.GetDirectoryReference(PsfHybridLogBlobName), this, true)).ToArray();
let psfblockDirectory = this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
let psfpageDirectory = this.pageBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(groupOrdinal))
select new AzureStorageDevice(PsfHybridLogBlobName, psfblockDirectory.GetDirectoryReference(PsfHybridLogBlobName), psfpageDirectory.GetDirectoryReference(PsfHybridLogBlobName), this, true)).ToArray();

await this.AcquireOwnership();

Expand Down Expand Up @@ -362,7 +379,7 @@ public async Task StopAsync()
await this.LeaseMaintenanceLoopTask; // wait for loop to terminate cleanly
}

public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, string localFileDirectoryPath, string taskHubName)
public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account, CloudStorageAccount pageBlobAccount, string localFileDirectoryPath, string taskHubName)
{
var containerName = GetContainerName(taskHubName);

Expand All @@ -376,21 +393,31 @@ public static async Task DeleteTaskhubStorageAsync(CloudStorageAccount account,
}
else
{
CloudBlobClient serviceClient = account.CreateCloudBlobClient();
var blobContainer = serviceClient.GetContainerReference(containerName);

if (await blobContainer.ExistsAsync())
async Task DeleteContainerContents(CloudStorageAccount account)
{
// do a complete deletion of all contents of this directory
var tasks = blobContainer.ListBlobs(null, true)
.Where(blob => blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
.Select(blob => BlobUtils.ForceDeleteAsync((CloudBlob)blob))
.ToArray();
await Task.WhenAll(tasks);
CloudBlobClient serviceClient = account.CreateCloudBlobClient();
var blobContainer = serviceClient.GetContainerReference(containerName);

if (await blobContainer.ExistsAsync())
{
// do a complete deletion of all contents of this directory
var tasks = blobContainer.ListBlobs(null, true)
.Where(blob => blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
.Select(blob => BlobUtils.ForceDeleteAsync((CloudBlob)blob))
.ToArray();
await Task.WhenAll(tasks);
}

// We are not deleting the container itself because it creates problems when trying to recreate
// the same container soon afterwards so we leave an empty container behind. Oh well.
}

// We are not deleting the container itself because it creates problems when trying to recreate
// the same container soon afterwards so we leave an empty container behind. Oh well.
await DeleteContainerContents(account);

if (pageBlobAccount != account)
{
await DeleteContainerContents(pageBlobAccount);
}
}
}

Expand Down Expand Up @@ -975,16 +1002,23 @@ internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal)
return result;
}

void GetPartitionDirectories(bool isPsf, int psfGroupOrdinal, string path, out CloudBlobDirectory blockBlobDir, out CloudBlobDirectory pageBlobDir)
{
var blockPartDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
blockBlobDir = blockPartDir.GetDirectoryReference(path);
var pagePartDir = isPsf ? this.pageBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.pageBlobPartitionDirectory;
pageBlobDir = pagePartDir.GetDirectoryReference(path);
}

internal IDevice GetIndexDevice(Guid indexToken, int psfGroupOrdinal)
{
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Called on {tag}, indexToken={indexToken}");
var (path, blobName) = this.GetPrimaryHashTableBlobName(indexToken);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Returned from {tag}, target={blobDirectory.Prefix}{blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexDevice Returned from {tag}, target={blockBlobDir.Prefix}{blobName}");
return device;
}

Expand All @@ -993,11 +1027,10 @@ internal IDevice GetSnapshotLogDevice(Guid token, int psfGroupOrdinal)
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Called on {tag}, token={token}");
var (path, blobName) = this.GetLogSnapshotBlobName(token);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Returned from {tag}, blobDirectory={blobDirectory} blobName={blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotLogDevice Returned from {tag}, blobDirectory={blockBlobDir} blobName={blobName}");
return device;
}

Expand All @@ -1006,11 +1039,10 @@ internal IDevice GetSnapshotObjectLogDevice(Guid token, int psfGroupOrdinal)
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Called on {tag}, token={token}");
var (path, blobName) = this.GetObjectLogSnapshotBlobName(token);
var partDir = isPsf ? this.blockBlobPartitionDirectory.GetDirectoryReference(this.PsfGroupFolderName(psfGroupOrdinal)) : this.blockBlobPartitionDirectory;
var blobDirectory = partDir.GetDirectoryReference(path);
var device = new AzureStorageDevice(blobName, blobDirectory, blobDirectory, this, false); // we don't need a lease since the token provides isolation
this.GetPartitionDirectories(isPsf, psfGroupOrdinal, path, out var blockBlobDir, out var pageBlobDir);
var device = new AzureStorageDevice(blobName, blockBlobDir, pageBlobDir, this, false); // we don't need a lease since the token provides isolation
device.StartAsync().Wait();
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Returned from {tag}, blobDirectory={blobDirectory} blobName={blobName}");
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetSnapshotObjectLogDevice Returned from {tag}, blobDirectory={blockBlobDir} blobName={blobName}");
return device;
}

Expand Down
Loading

0 comments on commit 6621e9a

Please sign in to comment.