diff --git a/OrleansShardedStorage.sln b/OrleansShardedStorage.sln index 1f79f69..dd38908 100644 --- a/OrleansShardedStorage.sln +++ b/OrleansShardedStorage.sln @@ -19,7 +19,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OrleansShardedStorageProvid EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UnitTests", "TestApplication\UnitTests\UnitTests.csproj", "{EE88F5E1-EEFE-4D80-97B7-EA6E5DF36908}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZDataFinder", "ZDataFinder\ZDataFinder.csproj", "{472CC87D-88D3-4B87-861C-98303964306C}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZDataFinder", "ZDataFinder\ZDataFinder.csproj", "{472CC87D-88D3-4B87-861C-98303964306C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OrleansShardedStorageProvider2", "OrleansShardedStorageProvider2\OrleansShardedStorageProvider2.csproj", "{9BE04697-080C-494D-9015-A70AB0A01F9C}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -55,6 +57,10 @@ Global {472CC87D-88D3-4B87-861C-98303964306C}.Debug|Any CPU.Build.0 = Debug|Any CPU {472CC87D-88D3-4B87-861C-98303964306C}.Release|Any CPU.ActiveCfg = Release|Any CPU {472CC87D-88D3-4B87-861C-98303964306C}.Release|Any CPU.Build.0 = Release|Any CPU + {9BE04697-080C-494D-9015-A70AB0A01F9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9BE04697-080C-494D-9015-A70AB0A01F9C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9BE04697-080C-494D-9015-A70AB0A01F9C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9BE04697-080C-494D-9015-A70AB0A01F9C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -67,6 +73,7 @@ Global {62FEC839-F350-4F24-921B-AA18BC516013} = {DCD6C532-ADC2-4A49-A7DF-2D559934912C} {EE88F5E1-EEFE-4D80-97B7-EA6E5DF36908} = {139B6E56-2FD9-4313-BC10-BF974F391AF0} {472CC87D-88D3-4B87-861C-98303964306C} = {139B6E56-2FD9-4313-BC10-BF974F391AF0} + {9BE04697-080C-494D-9015-A70AB0A01F9C} = {DCD6C532-ADC2-4A49-A7DF-2D559934912C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B78AD82C-DEA2-4C5F-AF2C-1D9F3CABEE3D} diff --git a/OrleansShardedStorageProvider/AzureShardedGrainBase.cs b/OrleansShardedStorageProvider/AzureShardedGrainBase.cs deleted file mode 100644 index 8821c2d..0000000 --- a/OrleansShardedStorageProvider/AzureShardedGrainBase.cs +++ /dev/null @@ -1,84 +0,0 @@ -using Orleans.Runtime; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace OrleansShardedStorageProvider -{ - public class AzureShardedGrainBase - { - protected readonly string _serviceId; - protected readonly AzureShardedStorageOptions _options; - - public AzureShardedGrainBase(string serviceId, AzureShardedStorageOptions options) - { - _serviceId = serviceId; - _options = options; - } - - public int GetShardNumberFromKey(string pk) - { - var hash = GetStableHashCode(pk); - var storageNum = Math.Abs(hash % this._options.ConnectionStrings.Count()); - - return storageNum; - } - - /// - /// Take from https://stackoverflow.com/a/36845864/852806 - /// - /// - /// - protected int GetStableHashCode(string str) - { - unchecked - { - int hash1 = 5381; - int hash2 = hash1; - - for (int i = 0; i < str.Length && str[i] != '\0'; i += 2) - { - hash1 = ((hash1 << 5) + hash1) ^ str[i]; - if (i == str.Length - 1 || str[i + 1] == '\0') - break; - hash2 = ((hash2 << 5) + hash2) ^ str[i + 1]; - } - - return hash1 + (hash2 * 1566083941); - } - } - - - protected const string KeyStringSeparator = "__"; - - public string GetKeyStringSeparator() - { - return KeyStringSeparator; - } - - protected string GetKeyString(GrainId grainId) - { - var key = $"{this._serviceId}{KeyStringSeparator}{grainId.ToString()}"; - - return SanitizeTableProperty(key); - } - - protected string SanitizeTableProperty(string key) - { - // Remove any characters that can't be used in Azure PartitionKey or RowKey values - // http://www.jamestharpe.com/web-development/azure-table-service-character-combinations-disallowed-in-partitionkey-rowkey/ - key = key - .Replace('/', '_') // Forward slash - .Replace('\\', '_') // Backslash - .Replace('#', '_') // Pound sign - .Replace('?', '_'); // Question mark - - if (key.Length >= 1024) - throw new ArgumentException(string.Format("Key length {0} is too long to be an Azure table key. Key={1}", key.Length, key)); - - return key; - } - } -} diff --git a/OrleansShardedStorageProvider/AzureShardedGrainStorage.cs b/OrleansShardedStorageProvider/AzureShardedGrainStorage.cs deleted file mode 100644 index d39ef21..0000000 --- a/OrleansShardedStorageProvider/AzureShardedGrainStorage.cs +++ /dev/null @@ -1,414 +0,0 @@ -using Azure; -using Azure.Data.Tables; -using Azure.Identity; -using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Newtonsoft.Json; -using Orleans; -using Orleans.Configuration; -using Orleans.Runtime; -using Orleans.Storage; -using System.Diagnostics; -using System.Text; - -namespace OrleansShardedStorageProvider -{ - /// - /// Origin: https://github.com/JsAndDotNet/OrleansShardedStorage - /// Similar to Oreleans:src\Azure\Orleans.Persistence.AzureStorage\Providers\Storage\AzureTableStorage.cs - /// - public class AzureShardedGrainStorage : AzureShardedGrainBase, IGrainStorage, ILifecycleParticipant - { - private readonly string _name; - private readonly ILogger _logger; - private List _tableClients = new List(); - private List _blobClients = new List(); - private StorageType _storageType = StorageType.TableStorage; - - - public AzureShardedGrainStorage(string name, AzureShardedStorageOptions options, IOptions clusterOptions, ILoggerFactory loggerFactory) - : base(clusterOptions.Value.ServiceId, options) - { - this._name = name; - var loggerName = $"{typeof(AzureShardedGrainStorage).FullName}.{name}"; - this._logger = loggerFactory.CreateLogger(loggerName); - } - - public void Participate(ISiloLifecycle lifecycle) - { - lifecycle.Subscribe(OptionFormattingUtilities.Name(this._name), this._options.InitStage, this.Init, this.Close); - } - - - public async Task Init(CancellationToken ct) - { - var stopWatch = Stopwatch.StartNew(); - - // This is required to give more detailed logging if it errors. - int exConfigIdx = -1; - - try - { - var initMsg = string.Format("Init: Name={0} ServiceId={1}", this._name, this._serviceId); - - this._logger.LogInformation($"Azure File Storage Grain Storage {this._name} is initializing: {initMsg}"); - - foreach (var storage in this._options.ConnectionStrings) - { - exConfigIdx++; - - if (storage.StorageType == StorageType.TableStorage) - { - _storageType = StorageType.TableStorage; - - var shareClient = String.IsNullOrEmpty(storage.SasToken) ? - new TableServiceClient(storage.BaseTableUri, new DefaultAzureCredential()) : - new TableServiceClient(storage.BaseTableUri, new AzureSasCredential(storage.SasToken)); - - var table = await shareClient.CreateTableIfNotExistsAsync(storage.TableOrContainerName); - - - var tableClient = new TableClient( - storage.TableStorageUri, - new AzureSasCredential(storage.SasToken)); - - this._tableClients.Add(tableClient); - } - else if (storage.StorageType == StorageType.BlobStorage) - { - _storageType = StorageType.BlobStorage; - - BlobServiceClient blobServiceClient = (null == storage.SasCredential) ? - new BlobServiceClient(storage.BaseBlobUri, new DefaultAzureCredential()) : - new BlobServiceClient(storage.BaseBlobUri, storage.SasCredential); - - var containerClient = blobServiceClient.GetBlobContainerClient(storage.TableOrContainerName); - await containerClient.CreateIfNotExistsAsync(); - - this._blobClients.Add(containerClient); - } - else - { - throw new NotImplementedException("type not implmeneted"); - } - } - - stopWatch.Stop(); - this._logger.LogInformation($"Initializing provider {this._name} of type {this.GetType().Name} in stage {this._options.InitStage} took {stopWatch.ElapsedMilliseconds} Milliseconds."); - } - catch (Exception exc) - { - stopWatch.Stop(); - - string whereString = "where unknown placeholder"; - if (exConfigIdx >= 0) - { - var excon = this._options.ConnectionStrings[exConfigIdx]; - var exStorageAcct = excon.AccountName; - var exStorageType = excon.StorageType.ToString(); - whereString = $"CN:{exConfigIdx},Name:{exStorageAcct},Type:{exStorageType}. "; - } - - this._logger.LogError($"{whereString}. Initialization failed for provider {this._name} of type {this.GetType().Name} in stage {this._options.InitStage} in {stopWatch.ElapsedMilliseconds} Milliseconds.", exc); - throw; - } - } - - async Task IGrainStorage.ReadStateAsync(string stateName, GrainId grainId, IGrainState grainState) - { - if (this._storageType == StorageType.TableStorage && - (this._tableClients == null || !this._tableClients.Any())) throw new ArgumentException("GrainState collection not initialized."); - - // This is required to give more detailed logging if it errors. - int exConfigIdx = -1; - - try - { - // NOTE: grainId does not always match the number expected for int keys, but they are consistent - var pk = GetKeyString(grainId); - var connectionIndex = GetShardNumberFromKey(pk); - var rowKey = SanitizeTableProperty(stateName); - exConfigIdx = connectionIndex; - - if (this._storageType == StorageType.TableStorage) - { - // NOTE: This will error if the row doesn't exist - it's disputed functionality from the Azure team - // In orleans, they just swallow the error, so we're doing the same - // See discussion here - https://github.com/Azure/azure-sdk-for-net/issues/16251 - // and Orleans Code here - {orleans}\src\Azure\Shared\Storage\AzureTableDataManager.cs Method ReadSingleTableEntryAsync - // This is quicker than Query once a row is there, so what we lose to start, we more than gain in speed later. Don't change it! - var res = await _tableClients[connectionIndex].GetEntityAsync(pk, rowKey); - - if (res != null) - { - var stringData = res.Value["StringData"].ToString(); - - if (!String.IsNullOrWhiteSpace(stringData)) - { - using (JsonTextReader jsonReader = - new JsonTextReader(new StringReader(stringData))) - { - JsonSerializer ser = new JsonSerializer(); - grainState.State = ser.Deserialize(jsonReader); - } - } - - grainState.RecordExists = grainState.State != null; - grainState.ETag = res.Value.ETag.ToString(); - } - - if (grainState.State == null) - { - grainState.State = Activator.CreateInstance(); - grainState.RecordExists = true; - } - } - else if (this._storageType == StorageType.BlobStorage) - { - var key = pk + "_" + rowKey; - var containerClient = _blobClients[connectionIndex]; - BlobClient blobClient = containerClient.GetBlobClient(key); - - var exists = await blobClient.ExistsAsync(); - - if (exists) - { - var download = await blobClient.DownloadContentAsync(); - BinaryData binData = download.Value.Content; - - //var bytes = binData.ToArray(); - var stringData = Encoding.UTF8.GetString(binData); - - if (!String.IsNullOrWhiteSpace(stringData)) - { - using (JsonTextReader jsonReader = - new JsonTextReader(new StringReader(stringData))) - { - JsonSerializer ser = new JsonSerializer(); - grainState.State = ser.Deserialize(jsonReader); - } - } - - grainState.RecordExists = grainState.State != null; - // Note: ETag is important for optimistic concurrency - grainState.ETag = download.Value.Details.ETag.ToString(); - } - } - else - { - throw new NotImplementedException("type not implemented for read"); - } - } - catch (Exception exc) - { - var errorString = exc.ToString(); - - // See comments above for GetEntityAsync error details - if (errorString.Contains("The specified resource does not exist") || - errorString.Contains("The specified blob does not exist")) - { - // We expect this error. There's nothing we can do about it. See comments above. - } - else - { - string grainRef = $"Failure reading state for Grain Type {stateName} with Id {grainId}."; - string whereMsg = "unknown location placeholder." + grainRef; - if (exConfigIdx >= 0) - { - var conx = this._options.ConnectionStrings[exConfigIdx]; - var exAcctName = conx.AccountName; - var exAcctType = conx.StorageType.ToString(); - var exTblCtrName = conx.TableOrContainerName; - - whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; - } - - var overall = whereMsg + exc.ToString(); - - this._logger.LogError(overall, grainRef); - throw; - } - } - - - } - - - async Task IGrainStorage.WriteStateAsync(string stateName, GrainId grainId, IGrainState grainState) - { - if (this._storageType == StorageType.TableStorage && (this._tableClients == null || !this._tableClients.Any())) throw new ArgumentException("GrainState collection not initialized."); - if (this._storageType == StorageType.BlobStorage && (this._blobClients == null || !this._blobClients.Any())) throw new ArgumentException("GrainState collection not initialized."); - - // This is required to give more detailed logging if it errors. - int exConfigIdx = -1; - - try - { - // NOTE: grainId does not always match the number expected for int keys, but they are consistent - var pk = GetKeyString(grainId); - var connectionIndex = GetShardNumberFromKey(pk); - var rowKey = SanitizeTableProperty(stateName); - exConfigIdx = connectionIndex; - - if (this._storageType == StorageType.TableStorage) - { - var entity = new TableEntity(pk, rowKey) - { - ETag = new ETag(grainState.ETag), - }; - - JsonSerializer ser = new JsonSerializer(); - StringBuilder sb = new StringBuilder(); - using (StringWriter sw = new StringWriter(sb)) - using (JsonWriter writer = new JsonTextWriter(sw)) - { - ser.Serialize(writer, grainState.State); - } - - entity["StringData"] = sb.ToString(); - - // TODO: LEARN - Should we check ETag and decide whether to update based on it??? - var opResult = await this._tableClients[connectionIndex].UpsertEntityAsync(entity); - // Note: ETag is important for optimistic concurrency - grainState.ETag = opResult.Headers.ETag.GetValueOrDefault().ToString(); - grainState.RecordExists = true; - } - else if (this._storageType == StorageType.BlobStorage) - { - JsonSerializer ser = new JsonSerializer(); - StringBuilder sb = new StringBuilder(); - using (StringWriter sw = new StringWriter(sb)) - using (JsonWriter writer = new JsonTextWriter(sw)) - { - ser.Serialize(writer, grainState.State); - } - - var rawContent = sb.ToString(); - var bytes = Encoding.UTF8.GetBytes(rawContent); - BinaryData binaryData = new BinaryData(bytes); - - var key = pk + "_" + rowKey; - var containerClient = _blobClients[connectionIndex]; - BlobClient blobClient = containerClient.GetBlobClient(key); - var upload = await blobClient.UploadAsync(binaryData, overwrite: true); - // Note: ETag is important for optimistic concurrency - grainState.ETag = upload.Value.ETag.ToString(); - grainState.RecordExists = true; - } - else - { - throw new NotImplementedException("type not implemented for read"); - } - } - catch (Exception exc) - { - string grainRef = $"Failure WRITING state for Grain Type {stateName} with Id {grainId}."; - string whereMsg = "unknown location placeholder." + grainRef; - if (exConfigIdx >= 0) - { - var conx = this._options.ConnectionStrings[exConfigIdx]; - var exAcctName = conx.AccountName; - var exAcctType = conx.StorageType.ToString(); - var exTblCtrName = conx.TableOrContainerName; - - whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; - } - - var overall = whereMsg + exc.ToString(); - - this._logger.LogError(overall, $"Failure writing state for Grain Type {stateName} with Id {grainId}."); - throw; // Definitely throw this error. - } - } - - - async Task IGrainStorage.ClearStateAsync(string stateName, GrainId grainId, IGrainState grainState) - { - if (this._tableClients == null || !this._tableClients.Any()) throw new ArgumentException("GrainState collection not initialized."); - int exConfigIdx = -1; - - try - { - var pk = GetKeyString(grainId); - var connectionIndex = GetShardNumberFromKey(pk); - var rowKey = SanitizeTableProperty(stateName); - exConfigIdx = connectionIndex; - - if (this._storageType == StorageType.TableStorage) - { - var res = await _tableClients[connectionIndex].GetEntityAsync(pk, rowKey); - if (res != null) - { - // NOTE: May wish to update entity with empty data? - await _tableClients[connectionIndex].DeleteEntityAsync(pk, rowKey); - } - } - else if (this._storageType == StorageType.BlobStorage) - { - var key = pk + rowKey; - var containerClient = _blobClients[connectionIndex]; - BlobClient blobClient = containerClient.GetBlobClient(key); - await blobClient.DeleteIfExistsAsync(); - } - else - { - throw new NotImplementedException("type not implemented for read"); - } - - - } - catch (Exception exc) - { - string grainRef = $"Failure CLEARING state for Grain Type {stateName} with Id {grainId}."; - string whereMsg = "unknown location placeholder." + grainRef; - if (exConfigIdx >= 0) - { - var conx = this._options.ConnectionStrings[exConfigIdx]; - var exAcctName = conx.AccountName; - var exAcctType = conx.StorageType.ToString(); - var exTblCtrName = conx.TableOrContainerName; - - whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; - } - - var overall = whereMsg + exc.ToString(); - - this._logger.LogError(overall, $"Failure clearing state for Grain Type {stateName} with Id {grainId}."); - throw; - } - } - - - public Task Close(CancellationToken ct) - { - if (this._storageType == StorageType.TableStorage) - { - this._tableClients = null; - } - else if (this._storageType == StorageType.BlobStorage) - { - - } - else - { - throw new NotImplementedException("type not implemented for read"); - } - - - return Task.CompletedTask; - } - - } - - public static class AzureShardedGrainStorageFactory - { - public static IGrainStorage Create(IServiceProvider services, string name) - { - var options = services.GetRequiredService>().Get(name); - return ActivatorUtilities.CreateInstance(services, options, name); - } - } -} diff --git a/OrleansShardedStorageProvider/AzureShardedStorageConnection.cs b/OrleansShardedStorageProvider/AzureShardedStorageConnection.cs deleted file mode 100644 index 1230b95..0000000 --- a/OrleansShardedStorageProvider/AzureShardedStorageConnection.cs +++ /dev/null @@ -1,109 +0,0 @@ -using Azure; - -namespace OrleansShardedStorageProvider -{ - public class AzureShardedStorageConnection - { - private static string TableStorageNameDefault = "OrleansGrainStateSharded"; - private static string BlobStorageNameDefault = "grainstatesharded"; - - public AzureShardedStorageConnection() - { - - } - - public AzureShardedStorageConnection(string accountName, string sasToken, - StorageType storageType = StorageType.TableStorage) - : this(accountName, sasToken, null, storageType) - { - } - - public AzureShardedStorageConnection(string accountName, - string sasToken, - string tableOrContainerName, - StorageType storageType = StorageType.TableStorage) - { - if (String.IsNullOrWhiteSpace(tableOrContainerName)) - { - if (storageType == StorageType.BlobStorage) - { - tableOrContainerName = BlobStorageNameDefault; - } - else - { - tableOrContainerName = TableStorageNameDefault; - } - } - - AccountName = accountName; - BaseTableUri = new Uri($"https://{accountName}.table.core.windows.net/"); - BaseBlobUri = new Uri($"https://{accountName}.blob.core.windows.net/"); - SasToken = sasToken; - TableOrContainerName = tableOrContainerName; - TableStorageUri = new Uri($"https://{accountName}.table.core.windows.net/{tableOrContainerName}"); - StorageType = storageType; - } - - public Uri GetBaseUri() - { - return this.StorageType == StorageType.TableStorage ? - this.BaseTableUri : this.BaseBlobUri; - } - - /// - /// The base table storage URI (Does not include table name) - /// - public Uri BaseTableUri { get; set; } - - /// - /// The base blob location (does not include container) - /// - public Uri BaseBlobUri { get; set; } - - /// - /// The storage account name e.g. 'storage1' - /// - public string AccountName { get; set; } - - /// - /// The SaS token used to access table or blob storage - /// - public string SasToken { get; set; } - - /// - /// AzureSasCredential generated from SaSToken - /// - public AzureSasCredential SasCredential - { - get - { - return new AzureSasCredential(SasToken); - } - } - - /// - /// For table storage, the table name - /// For blob storage, the container name - /// - public string TableOrContainerName { get; set; } - - /// - /// The full table storage URI (including table name) - /// - public Uri TableStorageUri { get; set; } - - - /// - /// The type of storage - /// (this class can handle connections to table or blob storage) - /// - public StorageType StorageType { get; set; } = StorageType.TableStorage; - } - - - public enum StorageType - { - TableStorage = 0, - BlobStorage = 1, - } -} diff --git a/OrleansShardedStorageProvider/AzureShardedStorageOptions.cs b/OrleansShardedStorageProvider/AzureShardedStorageOptions.cs deleted file mode 100644 index f809ca5..0000000 --- a/OrleansShardedStorageProvider/AzureShardedStorageOptions.cs +++ /dev/null @@ -1,18 +0,0 @@ -using Orleans; -using Orleans.Storage; - -namespace OrleansShardedStorageProvider -{ - - public class AzureShardedStorageOptions : IStorageProviderSerializerOptions - { - //[Redact] -- stops any logging of this info - public List ConnectionStrings { get; set; } - - public int InitStage { get; set; } = DEFAULT_INIT_STAGE; - - public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices; - - public IGrainStorageSerializer GrainStorageSerializer { get; set; } - } -} diff --git a/OrleansShardedStorageProvider/AzureShardedStorageSiloBuilderExtensions.cs b/OrleansShardedStorageProvider/AzureShardedStorageSiloBuilderExtensions.cs deleted file mode 100644 index d485b8c..0000000 --- a/OrleansShardedStorageProvider/AzureShardedStorageSiloBuilderExtensions.cs +++ /dev/null @@ -1,73 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Options; -using Orleans; -using Orleans.Configuration; -using Orleans.Hosting; -using Orleans.Providers; -using Orleans.Runtime; -using Orleans.Storage; - -namespace OrleansShardedStorageProvider -{ - /// - /// Origin: https://github.com/JsAndDotNet/OrleansShardedStorage - /// Ref: Oreleans:src\Azure\Orleans.Persistence.AzureStorage\Hosting\AzureTableSiloBuilderExtensions.cs - /// - public static class AzureShardedStorageSiloBuilderExtensions - { - /// - /// Configure silo to use azure ShardedTable storage as the default grain storage. - /// - public static ISiloBuilder AddAzureShardedGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) - { - return builder.AddAzureShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); - } - - /// - /// Configure silo to use azure ShardedTable storage for grain storage. - /// - public static ISiloBuilder AddAzureShardedGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) - { - return builder.ConfigureServices(services => services.AddAzureShardedGrainStorage(name, ob => ob.Configure(configureOptions))); - } - - - /// - /// Configure silo to use azure ShardedTable storage as the default grain storage. - /// - /// - public static ISiloBuilder AddAzureShardedGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) - { - return builder.AddAzureShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); - } - - - /// - /// Configure silo to use azure ShardedTable storage for grain storage. - /// - public static ISiloBuilder AddAzureShardedGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) - { - return builder.ConfigureServices(services => services.AddAzureShardedGrainStorage(name, configureOptions)); - } - - - internal static IServiceCollection AddAzureShardedGrainStorage( - this IServiceCollection services, - string name, - Action> configureOptions = null) - { - configureOptions?.Invoke(services.AddOptions(name)); - //services.AddTransient(sp => new AzureShardedStorageOptions(sp.GetRequiredService>().Get(name), name)); - //services.AddTransient, DefaultStorageProviderSerializerOptionsConfigurator>(); - services.ConfigureNamedOptionForLogging(name); - if (string.Equals(name, ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, StringComparison.Ordinal)) - { - services.TryAddSingleton(sp => sp.GetServiceByName(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME)); - } - return services.AddSingletonNamedService(name, AzureShardedGrainStorageFactory.Create) - .AddSingletonNamedService>(name, (s, n) => (ILifecycleParticipant)s.GetRequiredServiceByName(n)); - } - - } -} diff --git a/OrleansShardedStorageProvider/Hosting/AzureShardedGrainStorageServiceCollectionExtensions.cs b/OrleansShardedStorageProvider/Hosting/AzureShardedGrainStorageServiceCollectionExtensions.cs new file mode 100644 index 0000000..0246de8 --- /dev/null +++ b/OrleansShardedStorageProvider/Hosting/AzureShardedGrainStorageServiceCollectionExtensions.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Providers; +using Orleans.Runtime.Hosting; +using Orleans.Storage; +using OrleansShardedStorageProvider.Providers; +using OrleansShardedStorageProvider.Storage; + +namespace OrleansShardedStorageProvider.Hosting +{ + + + /// + /// extensions. + /// + public static class AzureShardedGrainStorageServiceCollectionExtensions + { + /// + /// Configures Sharded as the default grain storage provider. + /// + public static IServiceCollection AddShardedGrainStorageAsDefault(this IServiceCollection services, Action configureOptions) + { + return services.AddShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, ob => ob.Configure(configureOptions)); + } + + /// + /// Configures Sharded as a grain storage provider. + /// + public static IServiceCollection AddShardedGrainStorage(this IServiceCollection services, string name, Action configureOptions) + { + return services.AddShardedGrainStorage(name, ob => ob.Configure(configureOptions)); + } + + /// + /// Configures Sharded as the default grain storage provider. + /// + public static IServiceCollection AddShardedGrainStorageAsDefault(this IServiceCollection services, Action> configureOptions = null) + { + return services.AddShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configures Sharded as a grain storage provider. + /// + public static IServiceCollection AddShardedGrainStorage(this IServiceCollection services, string name, + Action> configureOptions = null) + { + configureOptions?.Invoke(services.AddOptions(name)); + services.AddTransient(sp => new AzureShardedStorageOptionsValidator(sp.GetRequiredService>().Get(name), name)); + services.AddTransient, DefaultStorageProviderSerializerOptionsConfigurator>(); + services.ConfigureNamedOptionForLogging(name); + return services.AddGrainStorage(name, AzureShardedGrainStorageFactory.Create); + } + } + +} diff --git a/OrleansShardedStorageProvider/Hosting/AzureShardedSiloBuilderExtensions.cs b/OrleansShardedStorageProvider/Hosting/AzureShardedSiloBuilderExtensions.cs new file mode 100644 index 0000000..3565b77 --- /dev/null +++ b/OrleansShardedStorageProvider/Hosting/AzureShardedSiloBuilderExtensions.cs @@ -0,0 +1,47 @@ +using Microsoft.Extensions.Options; +using Orleans.Providers; +using OrleansShardedStorageProvider.Providers; + +namespace OrleansShardedStorageProvider.Hosting +{ + + /// + /// extensions. + /// + public static class AzureShardedSiloBuilderExtensions + { + /// + /// Configures Sharded as the default grain storage provider. + /// + public static ISiloBuilder AddShardedGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) + { + return builder.AddShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configures Sharded as a grain storage provider. + /// + public static ISiloBuilder AddShardedGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) + { + return builder.ConfigureServices(services => services.AddShardedGrainStorage(name, configureOptions)); + } + + /// + /// Configures Sharded as the default grain storage provider. + /// + public static ISiloBuilder AddShardedGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) + { + return builder.AddShardedGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configures Sharded as a grain storage provider. + /// + public static ISiloBuilder AddShardedGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) + { + return builder.ConfigureServices(services => services.AddShardedGrainStorage(name, configureOptions)); + } + } + + +} diff --git a/OrleansShardedStorageProvider/Models/AzureShardedStorageConnection.cs b/OrleansShardedStorageProvider/Models/AzureShardedStorageConnection.cs new file mode 100644 index 0000000..270b76c --- /dev/null +++ b/OrleansShardedStorageProvider/Models/AzureShardedStorageConnection.cs @@ -0,0 +1,110 @@ +using Azure; + +namespace OrleansShardedStorageProvider.Models +{ + + public class AzureShardedStorageConnection + { + private static string TableStorageNameDefault = "OrleansGrainStateSharded"; + private static string BlobStorageNameDefault = "grainstatesharded"; + + public AzureShardedStorageConnection() + { + + } + + public AzureShardedStorageConnection(string accountName, string sasToken, + StorageType storageType = StorageType.TableStorage) + : this(accountName, sasToken, null, storageType) + { + } + + public AzureShardedStorageConnection(string accountName, + string sasToken, + string tableOrContainerName, + StorageType storageType = StorageType.TableStorage) + { + if (String.IsNullOrWhiteSpace(tableOrContainerName)) + { + if (storageType == StorageType.BlobStorage) + { + tableOrContainerName = BlobStorageNameDefault; + } + else + { + tableOrContainerName = TableStorageNameDefault; + } + } + + AccountName = accountName; + BaseTableUri = new Uri($"https://{accountName}.table.core.windows.net/"); + BaseBlobUri = new Uri($"https://{accountName}.blob.core.windows.net/"); + SasToken = sasToken; + TableOrContainerName = tableOrContainerName; + TableStorageUri = new Uri($"https://{accountName}.table.core.windows.net/{tableOrContainerName}"); + StorageType = storageType; + } + + public Uri GetBaseUri() + { + return this.StorageType == StorageType.TableStorage ? + this.BaseTableUri : this.BaseBlobUri; + } + + /// + /// The base table storage URI (Does not include table name) + /// + public Uri BaseTableUri { get; set; } + + /// + /// The base blob location (does not include container) + /// + public Uri BaseBlobUri { get; set; } + + /// + /// The storage account name e.g. 'storage1' + /// + public string AccountName { get; set; } + + /// + /// The SaS token used to access table or blob storage + /// + public string SasToken { get; set; } + + /// + /// AzureSasCredential generated from SaSToken + /// + public AzureSasCredential SasCredential + { + get + { + return new AzureSasCredential(SasToken); + } + } + + /// + /// For table storage, the table name + /// For blob storage, the container name + /// + public string TableOrContainerName { get; set; } + + /// + /// The full table storage URI (including table name) + /// + public Uri TableStorageUri { get; set; } + + + /// + /// The type of storage + /// (this class can handle connections to table or blob storage) + /// + public StorageType StorageType { get; set; } = StorageType.TableStorage; + } + + + public enum StorageType + { + TableStorage = 0, + BlobStorage = 1, + } +} diff --git a/OrleansShardedStorageProvider/OrleansShardedStorageProvider.csproj b/OrleansShardedStorageProvider/OrleansShardedStorageProvider.csproj index 05b6e89..ba08fa6 100644 --- a/OrleansShardedStorageProvider/OrleansShardedStorageProvider.csproj +++ b/OrleansShardedStorageProvider/OrleansShardedStorageProvider.csproj @@ -1,18 +1,18 @@  - net7.0 + net8.0 enable enable - - - - - - + + + + + + diff --git a/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptions.cs b/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptions.cs new file mode 100644 index 0000000..2dfef53 --- /dev/null +++ b/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptions.cs @@ -0,0 +1,25 @@ +using Orleans.Storage; +using OrleansShardedStorageProvider.Models; + +namespace OrleansShardedStorageProvider.Providers +{ + /// + /// AzureShardedStorage grain storage options. + /// + public class AzureShardedStorageOptions : IStorageProviderSerializerOptions + { + + //[Redact] -- stops any logging of this info + public List ConnectionStrings { get; set; } + + public int InitStage { get; set; } = DEFAULT_INIT_STAGE; + + public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices; + + public IGrainStorageSerializer GrainStorageSerializer { get; set; } + + } + + + +} diff --git a/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptionsValidator.cs b/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptionsValidator.cs new file mode 100644 index 0000000..e9dad5e --- /dev/null +++ b/OrleansShardedStorageProvider/Providers/AzureShardedStorageOptionsValidator.cs @@ -0,0 +1,27 @@ +using Orleans.Runtime; +using OrleansShardedStorageProvider.Storage; + +namespace OrleansShardedStorageProvider.Providers +{ + + internal class AzureShardedStorageOptionsValidator : IConfigurationValidator + { + private readonly AzureShardedStorageOptions _options; + private readonly string _name; + + public AzureShardedStorageOptionsValidator(AzureShardedStorageOptions options, string name) + { + _options = options; + _name = name; + } + + public void ValidateConfiguration() + { + if (_options.ConnectionStrings == null) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(AzureShardedGrainStorage)} with name {_name}. {nameof(AzureShardedStorageOptions)}.{nameof(_options.ConnectionStrings)} are required."); + } + } + } + +} diff --git a/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorage.cs b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorage.cs new file mode 100644 index 0000000..0f264e4 --- /dev/null +++ b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorage.cs @@ -0,0 +1,431 @@ +using Azure; +using Azure.Data.Tables; +using Azure.Identity; +using Azure.Storage.Blobs; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Orleans.Configuration; +using Orleans.Runtime; +using Orleans.Storage; +using OrleansShardedStorageProvider.Models; +using OrleansShardedStorageProvider.Providers; +using System.Diagnostics; +using System.Text; + +namespace OrleansShardedStorageProvider.Storage +{ + + public class AzureShardedGrainStorage : AzureShardedGrainStorageBase, IGrainStorage, ILifecycleParticipant + { + // ServiceId and Options are stored in the base class. + private readonly string _name; + private readonly ILogger _logger; + private List _tableClients = new List(); + private List _blobClients = new List(); + private StorageType _storageType = StorageType.TableStorage; + + /// + /// Creates a new instance of the type. + /// + public AzureShardedGrainStorage( + string name, + AzureShardedStorageOptions options, + IGrainStorageSerializer grainStorageSerializer, + IOptions clusterOptions, + ILogger logger) : + base(clusterOptions, options) + { + _name = name; + _logger = logger; + + } + + /// + public void Participate(ISiloLifecycle lifecycle) + { + var name = OptionFormattingUtilities.Name(_name); + lifecycle.Subscribe(name, _options.InitStage, Init, Close); + } + + private async Task Init(CancellationToken cancellationToken) + { + var timer = Stopwatch.StartNew(); + + // This is required to give more detailed logging if it errors. + int exConfigIdx = -1; + + try + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "ShardedGrainStorage {Name} is initializing: ServiceId={ServiceId}", + _name, + _serviceId); + } + + // Own content + + + var initMsg = string.Format("Init: Name={0} ServiceId={1}", this._name, this._serviceId); + + this._logger.LogInformation($"Azure File Storage Grain Storage {this._name} is initializing: {initMsg}"); + + foreach (var storage in this._options.ConnectionStrings) + { + exConfigIdx++; + + if (storage.StorageType == StorageType.TableStorage) + { + _storageType = StorageType.TableStorage; + + var shareClient = String.IsNullOrEmpty(storage.SasToken) ? + new TableServiceClient(storage.BaseTableUri, new DefaultAzureCredential()) : + new TableServiceClient(storage.BaseTableUri, new AzureSasCredential(storage.SasToken)); + + var table = await shareClient.CreateTableIfNotExistsAsync(storage.TableOrContainerName); + + + var tableClient = new TableClient( + storage.TableStorageUri, + new AzureSasCredential(storage.SasToken)); + + this._tableClients.Add(tableClient); + } + else if (storage.StorageType == StorageType.BlobStorage) + { + _storageType = StorageType.BlobStorage; + + BlobServiceClient blobServiceClient = (null == storage.SasCredential) ? + new BlobServiceClient(storage.BaseBlobUri, new DefaultAzureCredential()) : + new BlobServiceClient(storage.BaseBlobUri, storage.SasCredential); + + var containerClient = blobServiceClient.GetBlobContainerClient(storage.TableOrContainerName); + await containerClient.CreateIfNotExistsAsync(); + + this._blobClients.Add(containerClient); + } + else + { + throw new NotImplementedException("type not implmeneted"); + } + } + + + //end own content + + if (_logger.IsEnabled(LogLevel.Debug)) + { + timer.Stop(); + _logger.LogDebug( + "Init: Name={Name} ServiceId={ServiceId}, initialized in {ElapsedMilliseconds} ms", + _name, + _serviceId, + timer.Elapsed.TotalMilliseconds.ToString("0.00")); + } + } + catch (Exception ex) + { + timer.Stop(); + + string whereString = "where unknown placeholder"; + if (exConfigIdx >= 0) + { + var excon = this._options.ConnectionStrings[exConfigIdx]; + var exStorageAcct = excon.AccountName; + var exStorageType = excon.StorageType.ToString(); + whereString = $"CN:{exConfigIdx},Name:{exStorageAcct},Type:{exStorageType}. "; + } + + this._logger.LogError($"{whereString}. Initialization failed for provider {this._name} of type {this.GetType().Name} in stage {this._options.InitStage} in {timer.Elapsed.TotalMilliseconds.ToString()} Milliseconds.", ex); + + + throw new AzureShardedStorageException($"{ex.GetType()}: {ex.Message}"); + } + } + + /// + public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + if (this._storageType == StorageType.TableStorage && + (this._tableClients == null || !this._tableClients.Any())) throw new ArgumentException("GrainState collection not initialized."); + + // This is required to give more detailed logging if it errors. + int exConfigIdx = -1; + + try + { + // NOTE: grainId does not always match the number expected for int keys, but they are consistent + var pk = GetKeyString(grainId); + var connectionIndex = GetShardNumberFromKey(pk); + var rowKey = SanitizeTableProperty(grainType); + exConfigIdx = connectionIndex; + + if (this._storageType == StorageType.TableStorage) + { + // NOTE: This will error if the row doesn't exist - it's disputed functionality from the Azure team + // In orleans, they just swallow the error, so we're doing the same + // See discussion here - https://github.com/Azure/azure-sdk-for-net/issues/16251 + // and Orleans Code here - {orleans}\src\Azure\Shared\Storage\AzureTableDataManager.cs Method ReadSingleTableEntryAsync + // This is quicker than Query once a row is there, so what we lose to start, we more than gain in speed later. Don't change it! + var res = await _tableClients[connectionIndex].GetEntityAsync(pk, rowKey); + + if (res != null) + { + var stringData = res.Value["StringData"].ToString(); + + if (!String.IsNullOrWhiteSpace(stringData)) + { + using (JsonTextReader jsonReader = + new JsonTextReader(new StringReader(stringData))) + { + JsonSerializer ser = new JsonSerializer(); + grainState.State = ser.Deserialize(jsonReader); + } + } + + grainState.RecordExists = grainState.State != null; + grainState.ETag = res.Value.ETag.ToString(); + } + + if (grainState.State == null) + { + grainState.State = Activator.CreateInstance(); + grainState.RecordExists = true; + } + } + else if (this._storageType == StorageType.BlobStorage) + { + var key = pk + "_" + rowKey; + var containerClient = _blobClients[connectionIndex]; + BlobClient blobClient = containerClient.GetBlobClient(key); + + var exists = await blobClient.ExistsAsync(); + + if (exists) + { + var download = await blobClient.DownloadContentAsync(); + BinaryData binData = download.Value.Content; + + //var bytes = binData.ToArray(); + var stringData = Encoding.UTF8.GetString(binData); + + if (!String.IsNullOrWhiteSpace(stringData)) + { + using (JsonTextReader jsonReader = + new JsonTextReader(new StringReader(stringData))) + { + JsonSerializer ser = new JsonSerializer(); + grainState.State = ser.Deserialize(jsonReader); + } + } + + grainState.RecordExists = grainState.State != null; + // Note: ETag is important for optimistic concurrency + grainState.ETag = download.Value.Details.ETag.ToString(); + } + } + else + { + throw new NotImplementedException("type not implemented for read"); + } + } + catch (Exception exc) + { + var errorString = exc.ToString(); + + // See comments above for GetEntityAsync error details + if (errorString.Contains("The specified resource does not exist") || + errorString.Contains("The specified blob does not exist")) + { + // We expect this error. There's nothing we can do about it. See comments above. + } + else + { + string grainRef = $"Failure reading state for Grain Type {grainType} with Id {grainId}."; + string whereMsg = "unknown location placeholder." + grainRef; + if (exConfigIdx >= 0) + { + var conx = this._options.ConnectionStrings[exConfigIdx]; + var exAcctName = conx.AccountName; + var exAcctType = conx.StorageType.ToString(); + var exTblCtrName = conx.TableOrContainerName; + + whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; + } + + var overall = whereMsg + exc.ToString(); + + this._logger.LogError(overall, grainRef); + throw; + } + } + } + + /// + public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + if (this._storageType == StorageType.TableStorage && (this._tableClients == null || !this._tableClients.Any())) throw new ArgumentException("GrainState collection not initialized."); + if (this._storageType == StorageType.BlobStorage && (this._blobClients == null || !this._blobClients.Any())) throw new ArgumentException("GrainState collection not initialized."); + + // This is required to give more detailed logging if it errors. + int exConfigIdx = -1; + + try + { + // NOTE: grainId does not always match the number expected for int keys, but they are consistent + var pk = GetKeyString(grainId); + var connectionIndex = GetShardNumberFromKey(pk); + var rowKey = SanitizeTableProperty(grainType); + exConfigIdx = connectionIndex; + + if (this._storageType == StorageType.TableStorage) + { + var entity = new TableEntity(pk, rowKey) + { + ETag = new ETag(grainState.ETag), + }; + + JsonSerializer ser = new JsonSerializer(); + StringBuilder sb = new StringBuilder(); + using (StringWriter sw = new StringWriter(sb)) + using (JsonWriter writer = new JsonTextWriter(sw)) + { + ser.Serialize(writer, grainState.State); + } + + entity["StringData"] = sb.ToString(); + + // TODO: LEARN - Should we check ETag and decide whether to update based on it??? + var opResult = await this._tableClients[connectionIndex].UpsertEntityAsync(entity); + // Note: ETag is important for optimistic concurrency + grainState.ETag = opResult.Headers.ETag.GetValueOrDefault().ToString(); + grainState.RecordExists = true; + } + else if (this._storageType == StorageType.BlobStorage) + { + JsonSerializer ser = new JsonSerializer(); + StringBuilder sb = new StringBuilder(); + using (StringWriter sw = new StringWriter(sb)) + using (JsonWriter writer = new JsonTextWriter(sw)) + { + ser.Serialize(writer, grainState.State); + } + + var rawContent = sb.ToString(); + var bytes = Encoding.UTF8.GetBytes(rawContent); + BinaryData binaryData = new BinaryData(bytes); + + var key = pk + "_" + rowKey; + var containerClient = _blobClients[connectionIndex]; + BlobClient blobClient = containerClient.GetBlobClient(key); + var upload = await blobClient.UploadAsync(binaryData, overwrite: true); + // Note: ETag is important for optimistic concurrency + grainState.ETag = upload.Value.ETag.ToString(); + grainState.RecordExists = true; + } + else + { + throw new NotImplementedException("type not implemented for read"); + } + } + catch (Exception exc) + { + string grainRef = $"Failure WRITING state for Grain Type {grainType} with Id {grainId}."; + string whereMsg = "unknown location placeholder." + grainRef; + if (exConfigIdx >= 0) + { + var conx = this._options.ConnectionStrings[exConfigIdx]; + var exAcctName = conx.AccountName; + var exAcctType = conx.StorageType.ToString(); + var exTblCtrName = conx.TableOrContainerName; + + whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; + } + + var overall = whereMsg + exc.ToString(); + + this._logger.LogError(overall, $"Failure writing state for Grain Type {grainType} with Id {grainId}."); + throw; // Definitely throw this error. + } + } + + + + /// + public async Task ClearStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + if (this._tableClients == null || !this._tableClients.Any()) throw new ArgumentException("GrainState collection not initialized."); + int exConfigIdx = -1; + + try + { + var pk = GetKeyString(grainId); + var connectionIndex = GetShardNumberFromKey(pk); + var rowKey = SanitizeTableProperty(grainType); + exConfigIdx = connectionIndex; + + if (this._storageType == StorageType.TableStorage) + { + var res = await _tableClients[connectionIndex].GetEntityAsync(pk, rowKey); + if (res != null) + { + // NOTE: May wish to update entity with empty data? + await _tableClients[connectionIndex].DeleteEntityAsync(pk, rowKey); + } + } + else if (this._storageType == StorageType.BlobStorage) + { + var key = pk + rowKey; + var containerClient = _blobClients[connectionIndex]; + BlobClient blobClient = containerClient.GetBlobClient(key); + await blobClient.DeleteIfExistsAsync(); + } + else + { + throw new NotImplementedException("type not implemented for read"); + } + + + } + catch (Exception exc) + { + string grainRef = $"Failure CLEARING state for Grain Type {grainType} with Id {grainId}."; + string whereMsg = "unknown location placeholder." + grainRef; + if (exConfigIdx >= 0) + { + var conx = this._options.ConnectionStrings[exConfigIdx]; + var exAcctName = conx.AccountName; + var exAcctType = conx.StorageType.ToString(); + var exTblCtrName = conx.TableOrContainerName; + + whereMsg = $"Idx:{exConfigIdx},Acct:{exAcctName},Type:{exAcctType},TblCtr:{exTblCtrName}. {grainRef}. "; + } + + var overall = whereMsg + exc.ToString(); + + this._logger.LogError(overall, $"Failure clearing state for Grain Type {grainType} with Id {grainId}."); + throw; + } + } + + private async Task Close(CancellationToken cancellationToken) + { + if (this._storageType == StorageType.TableStorage) + { + this._tableClients = null; + } + else if (this._storageType == StorageType.BlobStorage) + { + this._blobClients = null; + } + else + { + throw new NotImplementedException("type not implemented for read"); + } + } + } + + +} diff --git a/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageBase.cs b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageBase.cs new file mode 100644 index 0000000..a6213d9 --- /dev/null +++ b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageBase.cs @@ -0,0 +1,83 @@ +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Runtime; +using OrleansShardedStorageProvider.Providers; + +namespace OrleansShardedStorageProvider.Storage +{ + public class AzureShardedGrainStorageBase + { + protected readonly string _serviceId; + protected readonly AzureShardedStorageOptions _options; + + public AzureShardedGrainStorageBase(IOptions clusterOptions, AzureShardedStorageOptions options) + { + _serviceId = clusterOptions.Value.ServiceId; + _options = options; + } + + public int GetShardNumberFromKey(string pk) + { + var hash = GetStableHashCode(pk); + var storageNum = Math.Abs(hash % this._options.ConnectionStrings.Count()); + + return storageNum; + } + + /// + /// Take from https://stackoverflow.com/a/36845864/852806 + /// + /// + /// + protected int GetStableHashCode(string str) + { + unchecked + { + int hash1 = 5381; + int hash2 = hash1; + + for (int i = 0; i < str.Length && str[i] != '\0'; i += 2) + { + hash1 = ((hash1 << 5) + hash1) ^ str[i]; + if (i == str.Length - 1 || str[i + 1] == '\0') + break; + hash2 = ((hash2 << 5) + hash2) ^ str[i + 1]; + } + + return hash1 + (hash2 * 1566083941); + } + } + + + protected const string KeyStringSeparator = "__"; + + public string GetKeyStringSeparator() + { + return KeyStringSeparator; + } + + protected string GetKeyString(GrainId grainId) + { + var key = $"{this._serviceId}{KeyStringSeparator}{grainId.ToString()}"; + + return SanitizeTableProperty(key); + } + + protected string SanitizeTableProperty(string key) + { + // Remove any characters that can't be used in Azure PartitionKey or RowKey values + // http://www.jamestharpe.com/web-development/azure-table-service-character-combinations-disallowed-in-partitionkey-rowkey/ + key = key + .Replace('/', '_') // Forward slash + .Replace('\\', '_') // Backslash + .Replace('#', '_') // Pound sign + .Replace('?', '_'); // Question mark + + if (key.Length >= 1024) + throw new ArgumentException(string.Format("Key length {0} is too long to be an Azure table key. Key={1}", key.Length, key)); + + return key; + } + } + +} diff --git a/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageFactory.cs b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageFactory.cs new file mode 100644 index 0000000..b499579 --- /dev/null +++ b/OrleansShardedStorageProvider/Storage/AzureShardedGrainStorageFactory.cs @@ -0,0 +1,24 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OrleansShardedStorageProvider.Providers; + +namespace OrleansShardedStorageProvider.Storage +{ + + /// + /// Factory used to create instances of AzureShardedStorage + /// + public static class AzureShardedGrainStorageFactory + { + /// + /// Creates a grain storage instance. + /// + public static AzureShardedGrainStorage Create(IServiceProvider services, string name) + { + var optionsMonitor = services.GetRequiredService>(); + var grainStorage = ActivatorUtilities.CreateInstance(services, name, optionsMonitor.Get(name)); + return grainStorage; + } + } + +} diff --git a/OrleansShardedStorageProvider/Storage/AzureShardedStorageException.cs b/OrleansShardedStorageProvider/Storage/AzureShardedStorageException.cs new file mode 100644 index 0000000..ace0a8a --- /dev/null +++ b/OrleansShardedStorageProvider/Storage/AzureShardedStorageException.cs @@ -0,0 +1,35 @@ +namespace OrleansShardedStorageProvider.Storage +{ + /// + /// Exception for throwing from AzureShardedStorage grain storage. + /// + [GenerateSerializer] + public class AzureShardedStorageException : Exception + { + /// + /// Initializes a new instance of . + /// + public AzureShardedStorageException() + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + public AzureShardedStorageException(string message) : base(message) + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. + public AzureShardedStorageException(string message, Exception inner) : base(message, inner) + { + } + + + } +} diff --git a/README.md b/README.md index a00b7fa..eec3442 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,21 @@ # Orleans Sharded Storage Provider (TABLES AND BLOBS) -This is for Orleans 7+. See here for the deprecated [3.6.2 version](https://github.com/JsAndDotNet/OrleansShardedStorage/tree/Orleans-3.6.2). +This is for Orleans 8+. + +- See here for the deprecated [Version 7](https://github.com/JsAndDotNet/OrleansShardedStorage/tree/Orleans-7) +- See here for the deprecated [3.6.2 version](https://github.com/JsAndDotNet/OrleansShardedStorage/tree/Orleans-3.6.2). + + --- +# Upgrading from Version 7 to Version 8 + +Due to issues upgrading, I've changed this to match the pattern used by the [Orleans Project](https://github.com/dotnet/orleans/tree/main/src/Redis/Orleans.Persistence.Redis) itself. + +This has added some additional namespace resolution requirements. Just right click the classes and select the appropriate namespace to resolve them. + + # What is it? Azure Storage has [request per second limitations](https://learn.microsoft.com/en-us/azure/storage/common/scalability-targets-standard-account) per storage account, which can easily be hit in high throughput applications. diff --git a/TestApplication/Client/Client.csproj b/TestApplication/Client/Client.csproj index 9f65a72..4111879 100644 --- a/TestApplication/Client/Client.csproj +++ b/TestApplication/Client/Client.csproj @@ -2,15 +2,15 @@ Exe - net7.0 + net8.0 enable enable - - - + + + diff --git a/TestApplication/GrainInterfaces/GrainInterfaces.csproj b/TestApplication/GrainInterfaces/GrainInterfaces.csproj index 5bf259b..7197be2 100644 --- a/TestApplication/GrainInterfaces/GrainInterfaces.csproj +++ b/TestApplication/GrainInterfaces/GrainInterfaces.csproj @@ -1,13 +1,13 @@ - + - net7.0 + net8.0 enable enable - + diff --git a/TestApplication/Grains/Grains.csproj b/TestApplication/Grains/Grains.csproj index 32151da..aa44b67 100644 --- a/TestApplication/Grains/Grains.csproj +++ b/TestApplication/Grains/Grains.csproj @@ -1,15 +1,15 @@ - + - net7.0 + net8.0 enable enable - - - + + + diff --git a/TestApplication/Silo/Program.cs b/TestApplication/Silo/Program.cs index 2902949..fabda51 100644 --- a/TestApplication/Silo/Program.cs +++ b/TestApplication/Silo/Program.cs @@ -4,7 +4,10 @@ using Orleans.Configuration; using Orleans.Runtime; -using OrleansShardedStorageProvider; + +using OrleansShardedStorageProvider.Models; +using OrleansShardedStorageProvider.Providers; +using OrleansShardedStorageProvider.Hosting; using Silo.Config; try @@ -60,11 +63,11 @@ static async Task StartSiloAsync() .UseOrleans(c => { c.UseLocalhostClustering() - .AddAzureShardedGrainStorage("ShardedTableStorageStore", opt => + .AddShardedGrainStorage("ShardedTableStorageStore", opt => { opt.ConnectionStrings = tableGrainStores; }) - .AddAzureShardedGrainStorage("ShardedBlobStorageStore", opt => + .AddShardedGrainStorage("ShardedBlobStorageStore", opt => { opt.ConnectionStrings = blobGrainStores; }) diff --git a/TestApplication/Silo/Silo.csproj b/TestApplication/Silo/Silo.csproj index 5d8f018..c9769d7 100644 --- a/TestApplication/Silo/Silo.csproj +++ b/TestApplication/Silo/Silo.csproj @@ -2,20 +2,21 @@ Exe - net7.0 + net8.0 enable enable c6b62504-65d4-401d-9360-5837734280ca - - - - + + + + + diff --git a/TestApplication/UnitTests/UnitTests.csproj b/TestApplication/UnitTests/UnitTests.csproj index c6c6a1e..bdedea9 100644 --- a/TestApplication/UnitTests/UnitTests.csproj +++ b/TestApplication/UnitTests/UnitTests.csproj @@ -1,22 +1,25 @@  - net7.0 + net8.0 enable enable - - - - - - - - - - + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + diff --git a/ZDataFinder/StorageDataFinder.cs b/ZDataFinder/StorageDataFinder.cs index 5a4dde6..753574e 100644 --- a/ZDataFinder/StorageDataFinder.cs +++ b/ZDataFinder/StorageDataFinder.cs @@ -14,6 +14,8 @@ using System.Collections.Concurrent; using Microsoft.Identity.Client; using System.Xml; +using OrleansShardedStorageProvider.Providers; +using OrleansShardedStorageProvider.Models; namespace ZDataFinder { diff --git a/ZDataFinder/ZDataFinder.csproj b/ZDataFinder/ZDataFinder.csproj index ac94195..c4354be 100644 --- a/ZDataFinder/ZDataFinder.csproj +++ b/ZDataFinder/ZDataFinder.csproj @@ -9,7 +9,7 @@ - +