Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

数据访问层添加主从数据库(读写分离)架构支持 #217

Closed
gmf520 opened this issue Mar 21, 2021 · 0 comments
Closed

数据访问层添加主从数据库(读写分离)架构支持 #217

gmf520 opened this issue Mar 21, 2021 · 0 comments
Labels
Feature 🔨 新功能,新特性 Finished ✔️ 实现并完工
Milestone

Comments

@gmf520
Copy link
Member

gmf520 commented Mar 21, 2021

[toc]

主从数据库(读写分离)设计

需求分析

  • 将单一数据库的部属方式更新为主从(master-slave)结构的部属方式,将读写分离,解决单一数据库处理能力不足的问题
  • 主从数据库之间通过数据库自身的支持进行数据单向(主 --> 从)同步
  • 当一个操作存在CUD(新增、更新、删除)的(事务)操作时,走主数据库
  • 当一个操作只存在读取操作,并且不能忍受数据延迟时,走主数据库
  • 当一个操作只存在读取操作,并且可以容忍一定的数据延迟时,可以走从数据库(大多数前台数据都是此类型)
  • 是否走从数据库,应当可配置可控
  • 是否走从数据库,对于业务实现应该是无感的
  • 实现一些从数据库的选择决策算法(随机,顺序,权重等)

需求实现

主从数据库配置选项

从数据库配置选项

    /// <summary>
    /// 从数据库选项
    /// </summary>
    public class SlaveDatabaseOptions : DataErrorInfoBase
    {
        /// <summary>
        /// 获取或设置 权重
        /// </summary>
        public int Weight { get; set; }

        /// <summary>
        /// 获取或设置 数据库连接串
        /// </summary>
        public string ConnectionString { get; set; }
    }

在数据上下文配置中使用,数据上下文配置中,配置了主库连接字符串ConnectionString,从数据库选项集合Slaves,从数据库选择策略名SlaveSelectorName

    /// <summary>
    /// 数据上下文配置节点
    /// </summary>
    public class OsharpDbContextOptions
    {
        // 其他属性
        
        /// <summary>
        /// 获取或设置 主数据库连接字符串
        /// </summary>
        public string ConnectionString { get; set; }
        
        /// <summary>
        /// 获取或设置 从数据库轮询策略
        /// </summary>
        public string SlaveSelectorName { get; set; }

        /// <summary>
        /// 获取或设置 从数据库选项集合
        /// </summary>
        public SlaveDatabaseOptions[] Slaves { get; set; }

        // 其他属性
    }

相应的appsetting.json中的JSON配置节点

{
  "OSharp": {
    "DbContexts": {
      "SqlServer": {
        "DbContextTypeName": "OSharp.Entity.DefaultDbContext,OSharp.EntityFrameworkCore",
        "ConnectionString": "Server=localhost;Database=osharpns-dev-api;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true",
        "Slaves": [
          {
            "Name": "Slave01",
            "Weight": 2,
            "ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave01;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
          },
          {
            "Name": "Slave02",
            "Weight": 5,
            "ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave02;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
          }
        ],
        "SlaveSelectorName": "Weight",
        "DatabaseType": "SqlServer",
        "AuditEntityEnabled": true,
        "AutoMigrationEnabled": true
      }
    }
  }
}

主从分离数据存储实现

实现思路:

构建数据上下文DbContext

构建数据上下文选项构建者DbContextOptionsBuilder应用具体的数据库驱动平台

数据库连接字符串提供者IConnectionStringProvider,动态获取数据库连接字符串

数据库主从分离策略IMasterSlaveSplitPolicy,决定走主数据库还是走从数据库

返回主数据库连接字符串

从数据库选择器ISlaveDatabaseSelector,从多个从数据库中选择一个执行数据读取

从数据库随机选择器RandomSlaveDatabaseSelector

从数据库顺序选择器SequenceSlaveDatabaseSelector

从数据库滑动加权选择器WeightSlaveDatabaseSelector

返回从数据库连接字符串

流程图:

具体实现:

业务实现的时候从主从分离的数据库中进行数据存取操作,用哪个数据库,主要就是取决于数据库连接字符串的选择,因此,只要在创建数据上下文DbContext实例之前决定好数据库连接字符串,就能动态选择不同的数据库。

OSharp 框架是在构建DbContextOptionsBuilder的时候使用数据库连接字符串的,定义了一个**数据库连接字符串提供者IConnectionStringProvider**接口来动态提供要连接的数据库连接字符串

/// <summary>
/// 构建<see cref="DbContextOptionsBuilder"/>,附加必要的扩展属性
/// </summary>
public static DbContextOptionsBuilder BuildDbContextOptionsBuilder<TDbContext>(this IServiceProvider provider, DbContextOptionsBuilder builder) where TDbContext : DbContext
{
	...

    IDbContextOptionsBuilderDriveHandler driveHandler = provider.GetServices<IDbContextOptionsBuilderDriveHandler>()
    	.LastOrDefault(m => m.Type == databaseType);
    if (driveHandler == null)
    {
    throw new OsharpException($"无法解析类型为 {databaseType}{typeof(IDbContextOptionsBuilderDriveHandler).DisplayName()} 实例");
    }

/*
	//旧代码
    ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
    string key = $"DbConnection_{osharpDbContextOptions.ConnectionString}";
    DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
    builder = driveHandler.Handle(builder, osharpDbContextOptions.ConnectionString, existingDbConnection);
*/

	//新代码
    IConnectionStringProvider connectionStringProvider = provider.GetRequiredService<IConnectionStringProvider>();
    string connectionString = connectionStringProvider.GetConnectionString(typeof(TDbContext));
    ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
    string key = $"DbConnection_{connectionString}";
    DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
    builder = driveHandler.Handle(builder, connectionString, existingDbConnection);

	...
}

数据库连接字符串提供者IConnectionStringProvider

    /// <summary>
    /// 数据库连接字符串提供器
    /// </summary>
    public interface IConnectionStringProvider
    {
        /// <summary>
        /// 获取指定数据上下文类型的数据库连接字符串
        /// </summary>
        /// <param name="dbContextType">数据上下文类型</param>
        /// <returns></returns>
        string GetConnectionString(Type dbContextType);
    }

IConnectionStringProvider实现类ConnectionStringProvider

    /// <summary>
    /// 数据库连接字符串提供者
    /// </summary>
    public class ConnectionStringProvider : IConnectionStringProvider
    {
        private readonly IDictionary<string, OsharpDbContextOptions> _dbContexts;
        private readonly ISlaveDatabaseSelector[] _slaveDatabaseSelectors;
        private readonly IMasterSlaveSplitPolicy _masterSlavePolicy;

        /// <summary>
        /// 初始化一个<see cref="ConnectionStringProvider"/>类型的新实例
        /// </summary>
        public ConnectionStringProvider(IServiceProvider provider)
        {
            _dbContexts = provider.GetOSharpOptions().DbContexts;
            _masterSlavePolicy = provider.GetService<IMasterSlaveSplitPolicy>();
            _slaveDatabaseSelectors = provider.GetServices<ISlaveDatabaseSelector>().ToArray();
        }

        /// <summary>
        /// 获取指定数据上下文类型的数据库连接字符串
        /// </summary>
        /// <param name="dbContextType">数据上下文类型</param>
        /// <returns></returns>
        public virtual string GetConnectionString(Type dbContextType)
        {
            OsharpDbContextOptions dbContextOptions = _dbContexts.Values.FirstOrDefault(m => m.DbContextType == dbContextType);
            if (dbContextOptions == null)
            {
                throw new OsharpException($"数据上下文“{dbContextType}”的数据上下文配置信息不存在");
            }

            bool isSlave = _masterSlavePolicy.IsToSlaveDatabase(dbContextOptions);
            if (!isSlave)
            {
                return dbContextOptions.ConnectionString;
            }
            
            SlaveDatabaseOptions[] slaves = dbContextOptions.Slaves;
            ISlaveDatabaseSelector slaveDatabaseSelector = _slaveDatabaseSelectors.LastOrDefault(m => m.Name == dbContextOptions.SlaveSelectorName)
                ?? _slaveDatabaseSelectors.First(m => m.Name == "Weight");
            SlaveDatabaseOptions slave = slaveDatabaseSelector.Select(slaves);
            return slave.ConnectionString;
        }
    }

数据库主从分离策略IMasterSlaveSplitPolicy

    /// <summary>
    /// 定义数据库主从分离策略
    /// </summary>
    public interface IMasterSlaveSplitPolicy
    {
        /// <summary>
        /// 是否前往从数据库
        /// </summary>
        /// <param name="options">数据上下文选项</param>
        /// <returns></returns>
        bool IsToSlaveDatabase(OsharpDbContextOptions options);
    }

IMasterSlaveSplitPolicy实现类MasterSlaveSplitPolicy

主从分离策略,默认流程如下:

上下文配置中从数据库配置不存在

工作单元启用了事务

当前执行Function为空或者Function上没有设置走从库

走主数据库

Function设置走从库

走从数据库

作为业务功能描述的Function信息中,有一个选项配置Function.IsSlaveDatabase是否走从库

只有只读业务,并且能忍受一定的数据延迟的,才应配置为走从库

如果业务涉及新增、更新、删除操作,默认策略将忽略IsSlaveDatabase配置

    /// <summary>
    /// 定义功能信息
    /// </summary>
    public interface IFunction : IEntity<Guid>, ILockable, IEntityHash
    {
    	//...
    	
        /// <summary>
        /// 获取或设置 是否从库读取数据
        /// </summary>
        bool IsSlaveDatabase { get; set; }
    	    	
    	//...
    }

默认主从分离策略实现MasterSlaveSplitPolicy

    /// <summary>
    /// 主从分离策略
    /// </summary>
    internal class MasterSlaveSplitPolicy : IMasterSlaveSplitPolicy
    {
        private readonly IUnitOfWork _unitOfWork;
        private readonly ScopedDictionary _scopedDict;

        /// <summary>
        /// 初始化一个<see cref="MasterSlaveSplitPolicy"/>类型的新实例
        /// </summary>
        public MasterSlaveSplitPolicy(IServiceProvider provider)
        {
            _unitOfWork = provider.GetUnitOfWork(false);
            _scopedDict = provider.GetRequiredService<ScopedDictionary>();
        }

        /// <summary>
        /// 是否前往从数据库
        /// </summary>
        /// <param name="options">数据上下文选项</param>
        /// <returns></returns>
        public bool IsToSlaveDatabase(OsharpDbContextOptions options)
        {
            SlaveDatabaseOptions[] slaves = options.Slaves;
            if (slaves.IsNullOrEmpty())
            {
                return false;
            }

            //允许工作单元事务,走主库
            if (_unitOfWork.IsEnabledTransaction)
            {
                return false;
            }

            // 在Function显式配置走从库,才走从库
            IFunction function = _scopedDict.Function;
            if (function == null || !function.IsSlaveDatabase)
            {
                return false;
            }

            return true;
        }
    }

从数据库选择器ISlaveDatabaseSelector

    /// <summary>
    /// 定义从数据库选择功能
    /// </summary>
    [MultipleDependency]
    public interface ISlaveDatabaseSelector
    {
        /// <summary>
        /// 获取 名称
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves);
    }
从数据库随机选择器RandomSlaveDatabaseSelector
    /// <summary>
    /// 随机从数据库选择器
    /// </summary>
    public sealed class RandomSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly Random Random = new Random();
        private readonly ILogger _logger;

        /// <summary>
        /// 初始化一个<see cref="RandomSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public RandomSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Random";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            SlaveDatabaseOptions slave = Random.NextItem(slaves);
            _logger.LogDebug($"随机选取了“{slave.Name}”的从数据库");
            return slave;
        }
    }
从数据库顺序选择器SequenceSlaveDatabaseSelector
    /// <summary>
    /// 顺序轮询从数据库选择器
    /// </summary>
    public sealed class SequenceSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly object LockObj = new object();
        private int _sequenceIndex;
        private readonly ILogger _logger;

        /// <summary>
        /// 初始化一个<see cref="SequenceSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public SequenceSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Sequence";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            lock (LockObj)
            {
                if (_sequenceIndex > slaves.Length - 1)
                {
                    _sequenceIndex = 0;
                }

                SlaveDatabaseOptions slave = slaves[_sequenceIndex];
                _logger.LogDebug($"顺序选取了“{slave.Name}”的从数据库,顺序号:{_sequenceIndex}");
                _sequenceIndex++;

                return slave;
            }
        }
    }
从数据库平滑加权选择器WeightSlaveDatabaseSelector
    /// <summary>
    /// 平滑权重从数据库选择器
    /// </summary>
    public sealed class WeightSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly object LockObj = new object();
        private readonly ILogger _logger;
        private Queue<int> _queue = new Queue<int>();

        /// <summary>
        /// 初始化一个<see cref="WeightSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public WeightSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Weight";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            lock (LockObj)
            {
                if (_queue.Count == 0)
                {
                    _queue = GetIndexes(slaves);
                }

                int index = _queue.Dequeue();
                SlaveDatabaseOptions slave = slaves[index];
                _logger.LogDebug($"平滑权重选取了“{slave.Name}”的从数据库,权重:{slave.Weight}");
                return slave;
            }
        }

        private static Queue<int> GetIndexes(SlaveDatabaseOptions[] slaves)
        {
            SlaveDatabaseOptionsWrap[] wraps = slaves.Select(m => new SlaveDatabaseOptionsWrap(m)).ToArray();
            int sum = wraps.Sum(m => m.Weight);
            Queue<int> queue = new Queue<int>();
            for (int i = 0; i < sum; i++)
            {
                int index = NextIndex(wraps);
                queue.Enqueue(index);
            }

            return queue;
        }

        private static int NextIndex(SlaveDatabaseOptionsWrap[] wraps)
        {
            int index = -1, total = 0;
            for (int i = 0; i < wraps.Length; i++)
            {
                SlaveDatabaseOptionsWrap wrap = wraps[i];
                wrap.Current += wrap.Weight;
                total += wrap.Weight;
                if (index == -1 || wraps[index].Current < wrap.Current)
                {
                    index = i;
                }
            }

            wraps[index].Current -= total;
            return index;
        }


        private class SlaveDatabaseOptionsWrap : SlaveDatabaseOptions
        {
            public SlaveDatabaseOptionsWrap(SlaveDatabaseOptions slave)
            {
                Weight = slave.Weight;
            }

            /// <summary>
            /// 获取或设置 当前权重
            /// </summary>
            public int Current { get; set; }
        }
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Feature 🔨 新功能,新特性 Finished ✔️ 实现并完工
Projects
None yet
Development

No branches or pull requests

1 participant