Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mq] mq knowledge #122

Open
Alice52 opened this issue Jun 15, 2020 · 8 comments
Open

[mq] mq knowledge #122

Alice52 opened this issue Jun 15, 2020 · 8 comments
Assignees

Comments

@Alice52
Copy link
Owner

Alice52 commented Jun 15, 2020

核心

  1. 消息队列技术选型
  2. 高可靠、高可用和高性能
  3. 消息不重复、不丢失
  4. 性能
  5. 扩展行: 水平
  6. 社区

分布式系统

  1. 最基本需求: 通信
  2. 特点: 3v-3h
    • 3v
      • 海量
      • 实时
      • 多样
    • 3h
      • 高并发
      • 高可靠
      • 高性能
  3. 底层技术面: 高性能通信、海量数据存储、高并发等.
  4. 消息队列:
    • 功能简洁
    • 结构清晰
    • 入门简单
    • 深度足够

knowledge list

mq-knowledge-list

  1. 应用

    • 日志
    • 监控
    • 微服务
    • 流计算
    • ETL
    • IoT
    • other
  2. 实现技术

    • 网络通信
    • 序列化反序列化
    • 一致性协议
    • 分布式事务
    • 异步编程
    • 数据压缩
    • 内存管理
    • 文件与高性能 IO
    • 高可用分布式系统

哪些问题适合使用消息队列来解决

  1. 异步处理: 秒杀

mq-seckill

  1. 流量控制:

    • 自身能力范围内尽可能多地处理请求, 拒绝处理不了的请求并且保证自身运行正常
    • 使用消息队列隔离网关和后端服务, 以达到流量控制和保护后端服务的目的

    mq-seckill-access-control

    • 能预估出秒杀服务的处理能力, 就可以用消息队列实现一个令牌桶
      • 单位时间内只发放固定数量的令牌到令牌桶中, 规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,
      • 如果令牌桶中没有令牌, 则拒绝请求. 这样就保证单位时间内, 能处理的请求不超过发放令牌的数量, 起到了流量控制的作用.

    mq-seckill-token-bucket

Selection_076

  1. 服务解耦
  2. 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
  3. 连接流计算任务和数据
  4. 用于将消息广播给大量接收者

issue

  1. 消息堆积时不适合使用 RabbitMQ, 考虑使用 RocketMQ、Kafka 和 Pulsar
@Alice52
Copy link
Owner Author

Alice52 commented Jun 15, 2020

其他应用

  1. 用消息队列来做异构数据库之间的数据同步: 顺序问题

@Alice52
Copy link
Owner Author

Alice52 commented Jun 15, 2020

消息中间件的选型

  1. 要求

    • 开源
    • 消息的可靠传递: 确保不丢消息
    • Cluster: 支持集群, 确保不会因为某个节点宕机导致服务不可用
    • 性能: 具备足够好的性能, 能满足绝大多数场景的性能要求
  2. rabbit MQ: 队列模型

    • 优点:
      • AMQP 协议
      • 轻量快捷
      • exchange 模式的路由
      • 社区活跃
    • 缺点
      • 消息堆积
      • 性能相对不好: 每秒钟可以处理几万到十几万条消息
      • 不适合扩展和二次开发
  3. RocketMQ: 发布 - 订阅模型

    • 优点
      • 低时延
      • 性能: 每秒钟大概能处理几十万条消息
      • 每个主题包含多个队列, 通过多个队列来实现多实例并行生产和消费 进而水平扩展
      • RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的
    • 缺点
      • 周边生态系统的集成和兼容程度要略逊一筹
  4. Kafka: 发布 - 订阅模型

    • 优点
      • 周边生态系统的集成和兼容程度最好, 大数据和流计算领域
      • 设计上大量使用了批量和异步的思想
      • 性能: Kafka 的极限处理能力可以超过每秒 2000 万条消息
    • 缺点
      • 同步收发消息的响应时延比较高
      • 先攒一波再一起处理: Kafka 不太适合在线业务场景
  5. activeMQ

  6. Pulsar: 采用存储和计算分离的设计


different between MQ

  1. rabbitmq 通过 EXCHANGE 将 同一份消息 发送到多个 QUEUE

    rabbitmq-topic

  2. 其他 MQ 产品是将消息发送到 TOPIC, 订阅者逻辑接受

    mq-topic


conclusion

  1. 消息队列不是系统核心, 对**消息队列功能和性能*都没有很高的要求, 只需要一个开箱即用易于维护的产品: RabbitMQ
  2. 处理在线业务, 比如在交易系统中用消息队列传递订单, 低延迟和金融级的稳定性, RocketMQ
  3. 处理海量的消息, 像收集日志、监控信息或是前端的埋点这类数据, 或是应用场景大量使用了大数据、流计算相关的开源产品, Kafka

@Alice52
Copy link
Owner Author

Alice52 commented Jun 16, 2020

issue

  1. 消息堆积时不适合使用 RabbitMQ, 考虑使用 RocketMQ、Kafka 和 Pulsar
  2. 为了确保消息的 由于网络或服务器故障丢失"请求 - 确认" 机制
    • 生产者发送消息到 broker, broker 需要回复确认, 否则生产者会重发
    • 消费者正确消费 broker 的消息之后, 需要回复确认, 否则会给消费者重发这条消息
    • issue: 带来了消息的顺序消费的问题 消息空洞 有序性
      • 每个主题在任意时刻, 至多只能有一个消费者实例在进行消费, 那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能.为了解决这个问题, RocketMQ 在主题下面增加了队列的概念

@Alice52
Copy link
Owner Author

Alice52 commented Jun 16, 2020

分布式事务的实现

rocketMQ

rocketmq-transaction

  • 第4步的失败问题:
    • rocket mq: 事务反查的机制

      rocketmq-transaction2 png

      • kafka: 会报错, 用户捕捉重试或者向前处理

reference

  1. rocket-mq
  2. rabbit-mq

@Alice52
Copy link
Owner Author

Alice52 commented Jun 16, 2020

丢消息

检测消息丢失的方法

  1. 使用分布式链路追踪系统
  2. 或者给每个消息都来一个一次递增的序号: 消息的有序性
    • Topic 不会严格顺序的消费, 只是分区顺序消费
    • Producer 是多实例的话, 则算出投放到指定分区, 且 每个 producer 维护各自序号
    • Consumer 实例的数量最好和分区数量一致, 做到 Consumer 和分区一一对应

确保消息可靠传递: Producer -- Broker -- Consumer

  1. 生产阶段

    • 请求确认机制,来保证消息的可靠传递
    • 编码: 正确处理返回值或者捕获异常
      • 同步发送时, 只要注意捕获异常即可
      try {
          RecordMetadata metadata = producer.send(record).get();
          System.out.println(" 消息发送成功。");
      } catch (Throwable e) {
          System.out.println(" 消息发送失败!");
          System.out.println(e);
      }
      • 异步发送时, 要回调方法里进行检查
      producer.send(record, (metadata, exception) -> {
          if (metadata != null) {
              System.out.println(" 消息发送成功。");
          } else {
              System.out.println(" 消息发送失败!");
              System.out.println(exception);
          }
      });
  2. 存储阶段

    • 只要 Broker 在正常运行,就不会出现丢失消息的问题
    • 如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息
    • 单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给Producer 返回确认响应
    • 集群,需要将 Broker 集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应
  3. 消费阶段

    • 成功消费后, 才会给 Broker 发送消费确认响应
    • 编码: 不要在收到消息后就立即发送消费确认, 而是应该在执行完所有消费业务逻辑之后, 再发送消费确认
    def callback(ch, method, properties, body):
        print(" [x] 收到消息 %r" % body)
        # 在这儿处理收到的消息
        database.save(body)
        print(" [x] 消费完成 ")
        # 完成消费业务逻辑后发送消费确认响应
        ch.basic_ack(delivery_tag = method.delivery_tag)
        
    channel.basic_consume(queue='hello', on_message_callback=callback)

@Alice52
Copy link
Owner Author

Alice52 commented Jun 16, 2020

MQTT

  1. At most once: 至多一次
  2. At least once: 至少一次
  3. Exactly once: 恰好一次

重复消费: ACK 丢失

  1. 用幂等性解决重复消息问题: At least once + 幂等消费 = Exactly once
    • 幂等(Idempotence): 如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性
    • 幂等: 其任意多次执行所产生的影响均与一次执行的影响相同

常用的设计幂等操作

  1. 利用数据库的唯一约束实现幂等

    • 转账: 转账流水表[转账单 ID、账户 ID 和变更金额], 账单 ID 和账户 ID 这两个字段联合
      起来创建一个唯一约束, 只能插入一条记录, 之后异步异步操作更新用户余额
    • redis SETNX, INSERT IF NOT EXIST”语义的存储类系统
  2. 为更新的数据设置前置条件

    • 设置前置条件, 执行一次后改变, 之后就不会执行
    • 转账: 如果账户 X 当前的余额为 500 元,将余额加100 元
    • 给你的数据增加一个版本号属性
  3. 记录并检查操作

    • 在执行数据更新操作之前,先检查一下是否执行过这个更新操作
    • 在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费
    • 检查消费状态,然后更新数据并且设置消费状态

@Alice52
Copy link
Owner Author

Alice52 commented Jun 16, 2020

消息积压

  1. 发送端性能优化: 增加批量或者是增加并发

    • 发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的
      • 发送网络请求之前: 发送端准备数据、序列化消息、构造请求等逻辑的时间
      • 发送消息和返回响应在网络传输中的耗时;
      • Broker 处理消息的时延
  2. 消费端性能优化: 增加并行的消费者需要同步扩容分区数量

    • 在扩容 Consumer 的实例数量的同时, 必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的
      • 在每个分区上实际上只能支持单线程消费
        Selection_075
      • 上图消息丢失由于内存内的消息没有被及时消费
  3. 排查

    • 发送消息的速度还是消费消息的速度和原来都没什么变化: 考虑是不是消费失败导致的

@Alice52
Copy link
Owner Author

Alice52 commented Aug 10, 2020

use dimensions

1. 环境

  1. 社区
  2. 安装-docker
  3. 性能
  4. 集群

2. 消息

  1. 结构
  2. 相关的概念
  3. 消息丢失
  4. 消息重复
  5. 消息积压
  6. 消息持久化
  7. 消息 retry: 生产者, 消费者
  8. 事务

3. 生产者

  1. 定时投递
  2. 异步投递
  3. 延时投递
  4. 批量投递

4. 消费者

  1. 消费幂等
  2. 顺序消费
  3. 延时消费
  4. ack
  5. 批量消费
  6. 并发消费
  7. 消费速度
    • 增加消费者
    • 提高Prefetch count
    • 多线程处理
    • 批量Ack
    • ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants