> For the complete documentation index, see [llms.txt](https://nixum.gitbook.io/note/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://nixum.gitbook.io/note/xiao-xi-dui-lie.md).

# 消息队列

\[TOC]

## 使用场景

* 秒杀系统，一般秒杀系统处理包含几个步骤：风险控制、库存锁定、生成订单、短信通知、更新统计数据等，而决定秒杀是否成功只在前两个步骤，后续的操作就可以通过消息队列**异步**处理完成，加快整个流程的处理，**减少等待时间，提升并发量**。
* 隔离网关和后端服务，实现**流量控制，保护**后端服务，但会增加系统调用链，导致总体响应变长，异步增加系统复杂性。
* 令牌桶，目的也是进行流量控制。
* 服务解耦，数据同步，比如订单系统在订单状态发生变化时发出消息通知，其他服务订阅后做相应处理。
* 连接流计算任务和数据，比如集群日志处理，大数据统计
* 将消息广播给其他接收者

### 好处

流量削峰和流量控制、异步处理、解耦、广播、最终一致性

### 缺点

可用性降低、复杂度提高、一致性问题、消息延迟

## 常见消息队列

| 特性             | ActiveMQ                                                                                                                        | RabbitMQ                                                                                                                                                                                    | RocketMQ                                                                                                                                                     | Kafaka                                                                                                                                                                                                                                         |
| -------------- | ------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 单机吞吐量          | 万级，吞吐量比RocketMQ和Kafka要低了一个数量级                                                                                                   | 万级，吞吐量比RocketMQ和Kafka要低了一个数量级                                                                                                                                                               | 10万级，RocketMQ也是可以支撑高吞吐的一种MQ                                                                                                                                  | 10万级别，这是kafka最大的优点，就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景                                                                                                                                                                                         |
| topic数量对吞吐量的影响 |                                                                                                                                 | 使用队列模型，通过Exchange模块实现发布-订阅模型，Exchange位于生产者和队列之间，由Exchange决定将详细投递到哪个队列。                                                                                                                      | topic可以达到几百，几千个的级别，吞吐量会有较小幅度的下降这是RocketMQ的一大优势，在同等机器下，可以支撑大量的topic                                                                                           | topic从几十个到几百个的时候，吞吐量会大幅度下降。所以在同等机器下，kafka尽量保证topic数量不要过多。如果要支撑大规模topic，需要增加更多的机器资源                                                                                                                                                             |
| 可用性            | 高，基于主从架构实现高可用性                                                                                                                  | 高，基于主从架构实现高可用性                                                                                                                                                                              | 非常高，分布式架构                                                                                                                                                    | 非常高，kafka是分布式的，一个数据多个副本，少数机器宕机，不会丢失数据，不会导致不可用                                                                                                                                                                                                  |
| 消息可靠性          | 有较低的概率丢失数据                                                                                                                      |                                                                                                                                                                                             | 经过参数优化配置，可以做到0丢失                                                                                                                                             | 经过参数优化配置，消息可以做到0丢失                                                                                                                                                                                                                             |
| 时效性            | ms级                                                                                                                             | 微秒级，这是rabbitmq的一大特点，延迟是最低的                                                                                                                                                                  | ms级                                                                                                                                                          | 延迟在ms级以内                                                                                                                                                                                                                                       |
| 功能支持           | MQ领域的功能极其完备                                                                                                                     | 基于erlang开发，所以并发能力很强，性能极其好，延时很低                                                                                                                                                              | MQ功能较为完善，还是分布式的，扩展性好                                                                                                                                         | 功能较为简单，主要支持简单的MQ功能，在大数据领域的实时计算以及日志采集被大规模使用，是事实上的标准                                                                                                                                                                                             |
| 优劣势总结          | 非常成熟，功能强大，在业内大量的公司以及项目中都有应用。偶尔会有较低概率丢失消息，而且现在社区以及国内应用都越来越少，官方社区现在对ActiveMQ 5.x维护越来越少，几个月才发布一个版本而且确实主要是基于解耦和异步来用的，较少在大规模吞吐的场景中使用 | erlang语言开发，性能极其好，延时很低；吞吐量到万级；拥有灵活的路由配置；MQ功能比较完备而且开源提供的管理界面非常棒，用起来很好用；社区相对比较活跃；RabbitMQ确实吞吐量与其他几个相比会低一些，这是因为他做的实现机制比较重；对消息的堆积支持不是很好，当有大量消息积压时，会导致RabbitMQ性能急剧下降；erlang开发，比较小众，很难读源码，很难定制和掌控。 | 接口简单易用，日处理消息上百亿之多，可以做到大规模吞吐，性能也非常好，分布式扩展也很方便，社区维护还可以，可靠性和可用性都是ok的，还可以支撑大规模的topic数量，支持复杂MQ业务场景。java实现，源码易读；中文社区活跃度，文档相对来说简单，接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。 | kafka的特点其实很明显，就是仅仅提供较少的核心功能，但是提供超高的吞吐量，ms级的延迟，极高的可用性以及可靠性，而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可，保证其超高吞吐量。而且kafka唯一的一点劣势是有可能消息重复消费，那么对数据准确性会造成极其轻微的影响，在大数据领域中以及日志采集中，这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。异步批量操作性能极佳，但是同步收发信息因为要攒一批消息才进行处理会导致响应延时比较高。 |

### RabbitMQ

模型：生产者 -》exchange交换器 -》队列 -》消费者

生产者通过指定一个RoutingKey来选择对应的交换器，一个交换器可以对接多个队列，队列通过BindingKey绑定对应的交换器。

生产者只能把消息发送给交换器，发送时指定RoutingKey，当消息的RoutingKey与某个队列的BindingKey匹配时，消息从交换器路由到队列，队列才是真正存储消息的地方。

**交换器的4种类型**：

* fanout：扇形发出，把所有发送到该交换器的消息路由到所有与其绑定的队列中，此时RoutingKey不起作用；
* direct：把消息路由到BindingKey和RoutingKey完全匹配的队列中；
* headers：不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差，基本用不到；
* topic：虽然也是BindingKey和RoutingKey进行匹配才能转发到对应的队列，但是支持模糊匹配；

RabbitMQ有个默认的交换器，名字是空串，类型为direct，所有队列以自己的名字为BindingKey绑定到该默认的交换器上，这样生产者如果要发送消息到默认的交换器上时，指定RoutingKey为对应队列的名字即可，看起来就好像是直接发给某个队列一样了。

**消息传输**：

消息基于信道channel传输，信道是建立在真实的TCP连接内的虚拟连接，且每条TCP连接上的信道数量没有限制。push模式下，若一个队列有多个消费者，RabbitMQ默认将以轮询的方式发送给消费者，还有另一种模式是根据消费者的消费能力来做分发，空闲的消费者将处理更多的消息。

默认情况下，无法被路由的消息会被直接丢弃，如果设置了`mandatory=true`并配合ReturnListener，可以实现消息回发给备用交换器。

**消息去重**：

* 一般是保证消费者消费时幂等，比如：在消息体加入一个业务id来实现
* 消息由生产者产生时，MQ内部会对每条消息生产一个inner-msg-id，作为去重的依据，避免重复消息进入队列

**RabbitMQ事务**：

生产者在发送数据之前，开启RabbitMQ事务，然后发送消息，如果消息没有被RabbitMQ接收到，生产者就会收到异常报错，此时会回滚消息或者重试；

事务机制是同步的，因此吞吐量不高，太耗性能，一般使用生产者确认模式保证消息不丢。

**生产者确认模式**：

* 生产者确认模式只确保消息正确到达交换器，对于从交换器路由到队列失败的消息，则会被丢弃掉，导致消息丢失，so 对于不可路由的消息，则有Return消息机制（通过在交换器设置回调函数处理，监听路由不可达消息）和备份交换器（路由不可达的消息都发往这个交换器）来解决；
* 将信道设置成confirm模式，此时需要发送方确认，所有在信道上发布的消息都会被指定一个唯一的ID，一旦消息被投递到目标队列、或者消息被写入磁盘后，信道会发送一个ack给生产者；
* 如果RabbitMQ发生内部错误而导致消息丢失，会发生一条 nack（not ack）消息给生产者表示未确认；
* 发送方确认模式是异步的，生产者在等待确认的同时，可以继续发送消息，当生产者接收到确认消息后，生产者的回调方法就会触发来处理确认消息；

**消费者确认模式**：

* 消费者接收的每一条消息后都必须进行确认，消费者只有确认了消息，RabbitMQ才会安全地把消息从队列中删除；
* RabbitMQ仅通过消费者的连接中断来确认是否需要重新发送消息，只要连接不中断，RabbitMQ会给消费者足够长的时间来处理消息，保证数据的最终一致性；
* 如果消费者接收到消息后没法发送确认消息，连接也未断开，RabbitMQ会认为该消费者繁忙，之后不会再给该消费者发放更多的消息；
* 不设置自动ack，消费者在处理完消息之后收到发送ack给RabbitMQ；

**RabbitMQ持久化机制**：

RabbitMQ持久化消息后，如果RabbitMQ挂了，恢复只会会自动读取之前存储的数据，持久化设置有两个步骤，必须同时设置才能保证消息不丢，持久化机制一般配合生产者确认模式，只有消息被持久化到磁盘了，才会给生产者发送ack。

* 创建队列时，设置持久化，确保RabbitMQ能持久化队列的元数据，但它并不会持久化消息的数据；
* 生产者发送消息时设置 `deliberyMode = 2` ，此时RabbitMQ才会将消息数据持久化到磁盘上；

消费者一旦从队列中消费了一条消息并回复ack，RabbitMQ就会从持久化日志中移除这条消息，so 在消费消息前，如果RabbitMQ重启，服务器会自动重建交换器和队列，加载持久化日志中的消息到对应的队列或交换器上，保证消息不会丢失。

**高可用**：

都是基于主从部署，有普通集群模式和镜像集群模式，

* 普通集群模式：多台机器，每台部署一个RabbitMQ实例，我们创建的队列，**只会放在一个实例上，但每个实例都会同步队列里的元数据**（元数据是队列的一些配置信息，可以通过其找到队列所在的实例），消费的时候，如果**连接到了另一个实例，那么该实例会从队列所在实例上拉去数据过来消费**，实现多个节点服务某个队列的读写操作；
* 镜像集群模式：我们所创建的队列，**无论元数据还是队列里的消息都会存在多个实例上**，即每个RabbitMQ节点都有这个队列的一个完整镜像，包含队列里的全部数据。**队列每次收到消息时，都会自动把消息同步到多个实例的队列上，每个节点都有这个队列的完整数据**，所以任意一台机器宕机了，还可以从别的机器进行消费，但这种方式对带宽压力比较大；

### Kafka

* Topic：主题，一个主题可以分为一个或多个分区；
* Broker：一台Kafka就是一个Broker，一个集群由多个Broker组成，一个Broker可以容纳多个Topic；Broker负责接收和处理生产者发送的消息，**Broker内可以将每个主题topic配置成M个分区**（类似mongo的分片），每个分区是一组有序的消息日志，每条消息的位移从0开始，依次递增；

  Broker持久化时，使用消息日志来保存数据，日志是追加写入的物理文件，避免的随机IO，提升吞吐量；一个Topic中的一个分区大文件会被分成多个小的文件端，即一个日志会被细分成多个日志段，消息被追加写到当前最新的日志段中，写满一个日志段后，Broker会自动切分出一个新的日志段，并将老的日志段封存，后台有定时任务定期检查老的日志段是否能被删除，回收磁盘空间；

  每个日志段的默认大小是1G，当文件大于1G时，会滚动一个新的日志段，并且以上一个日志段最后一条消息的位移命令；

  通过索引信息将元数据全部映射到内存；通过索引信息就能快速定位消息和确定response的最大大小；通过索引文件稀疏存储，大幅降低索引文件元数据占用空间大小；
* Partition：分区，**一个Topic可以分布到多个Broker上**，每个分区是一个有序队列，生产者生产的**每条消息只会被发送到一个分区**中，分区中的每条消息都会被分配一个有序的id（offset）；

  Kafka只保证一个分区中的每条消息按顺序消费，不保证一个Topic中多个分区间的消费顺序；

  第一个分区（编号为0）的第一个副本放置的位置是随机从Broker列表中选择的，其他分区的第一个副本放置位置相对于第0个分区依次往后移，剩余的副本相当于第一个副本放置位置由`nextReplicacShift`决定，该数也是随机产生的；

  **每个分区可以配置多个副本，每个分区的副本只能有一个对外提供服务，其他的副本仅作备份**，即其不允许读写分离，原因是kafka主从同步比较耗时，会产生延时，导致数据一致性问题；

  分区仅支持增加，不允许减少，不然分区减少时，原有的数据不好分配，且破坏了分区有序，实现起来很复杂；

  分区的好处是实现负载均衡，对于消费者来说，提高并发度，提升吞吐量。
* Consumer Group：消费组，是Kafka实现广播和单播的手段，由多个消费者实例共同组成一个消费组来消费某一组主题，这组主题中的每个分区都只会被组内一个消费者消费，组内的其他消费者都不能消费；通过同一消费组内多消费者同时消费，加速消费吞吐量；
* Offset：Kafka存在两种位移字段，一个是消息位移，表示分区中每条消息的位置，单调递增；另一个是消费者位移，表示每个消费者在消费过程中会记录它当前消费到分区的哪个位置上，代表消费者的消费进度；

  Kafka的存储文件通过 `[offset].kafka` 来命名，方便根据offset进行查找；
* Rebalance：消费组有重平衡功能，当组内某个消费者挂掉后，其他消费者会自动重新分配订阅主题的分区进行消费；
* Kafka参数topic级别的参数的优先级会大于broker级别的参数；
* Kafka是基于日志结构的消息引擎，消费者在消费消息时，只是从磁盘文件上读取数据而已，消费者不会删除消息，只是修改对应的位移，而RabbitMQ和ActivityMQ在消息消费成功后，是直接删除消息的；
* ZooKeeper在Kafka中的作用：
  * Broker注册，每个Broker启动时，都会在`/brokers/ids`创建自己的节点，记录自己的IP地址和端口；
  * Topic注册，由于**同一个Topic的消息会被分成多个分区并将其分布在多个Broker上**，分区与Broker的映射由ZooKeeper维护；
  * 提供负载均衡能力，ZooKeeper会尽力将分区分布到不同的Broker节点上，生产者产生消息后也会尽量投递到不同Broker的分区里，当消费者消费时，ZooKeeper会根据当前的分区数量和消费者数量实现动态负载均衡。
* Kafka判断节点是否存活的条件：
  * 节点必须可以维护和ZooKeeper的连接，ZooKeeper通过心跳机制检查每个节点的连接；
  * 如果节点是Follower，它必须能及时同步Leader的写操作，保持在一定的延时内；
* 总结起来Kafka实现高吞吐率的原因：顺序读写、零拷贝、文件分段、批量发送、数据压缩、分区、高效的日志分段及索引设计、重复利用page cache、reactor网络模型、支持多磁盘驱动、mmap操作内存很快，减少用户空间到内核空间的拷贝等；

  缺点：由于是批量发送，数据并非真正实时、不支持MQTT协议、仅支持分区内消息有序，无法实现全局消息有序、需要依赖Zookeeper进行元数据管理；

#### 分区机制

* 生产者发送消息给到某一主题的Broker，最后在保存到该主题下的某一个分区，而不会在多个分区中被保存多份。
* **分区的作用是提供负载均衡的能力，不同的分区能被放置在不同的节点Broker上**，对数据的读写操作也是以分区为粒度而进行的，使得每个节点的机器都能独立地指向各自分区的读写请求，**以此提高吞吐量和伸缩性**。
* 分区也能实现消息的顺序性，比如一个主题设置一个分区，或者生产者发送消息时指定key或者分区，消息的顺序性只保证在一个分区内有序，通过位移实现；
* 分区策略，要合理使用分区策略，避免消息数据产生倾斜，对某些分区产生性能瓶颈，导致下游消费能力下降
  * 轮询策略，顺序分配：比如一个主题下有3个分区，生产者发送消息时会被依次分配到每个分区，默认的分区策略，具有非常高的负载均衡能力，最大限度保证消息被平均分配到所有分区中；
  * 随机策略：消息会被随机分配到某一分区上；
  * 按消息的key保序策略：根据不同的key分配到不同的分区上，有点像RabbitMQ的RoutingKey；

#### 消息压缩

Kafka消息分为两层：消息集合 和 消息，一个消息集合包含若干条日志项，日志项才是真正封装消息的地方，Kafka不会直接操作具体的一条条消息，而是在消息集合这个层面进行写入操作。

* 压缩是时间换空间，目的是用较小的CPU资源换更少的磁盘占用和带宽、网络IO传输；
* 压缩的地方有两处：生产者端和Broker端，一般来说，生产者在发送消息时会指定压缩算法，比如GZip；大部分情况下，Broker只是原样保存消息，但会在两种情况下对消息重新压缩，比如Broker端和生产者端使用不同的压缩算法，此时Broker接收到消息后会先解压在重新进行压缩；或者Broker为了兼容 V1 或 V2 两个版本的消息，此时也会发生消息格式转换，这种场景下对性能的影响很大，还会用不上 零拷贝 特性；
* 解压的地方有两处：消费者端和Broker端，消费组端会根据消息集合中指定的压缩算法进行解压；Broker解压则是因为需要对消息进行校验，但这种场景要区别于上面两种；
* 压缩算法中，吞吐量方面：LZ4 > Snappy > zstd、GZip，压缩比方面：zstd > LZ4 > GZip > Snappy，Snappy比较耗CPU资源，其他差别不大；

#### 消息可靠性

* Kafka只对 已提交 的消息做 有限度 的持久化保证。

  已提交：Broker成功地接收到一条消息并写入到日志文件后，告诉生产者这条消息已成功提交，我们可以指定是一个Broker成功保存算已提交或者多个Broker都保存成功才算成功。

  有限度的持久化保证：只要不是所有的Broker都挂掉，就能保证消息永远不会丢。
* 生产者发送消息给Broker时是异步的，Kafka sdk会在生产者那攒一批消息在一起发送，如果此时出现问题就会丢消息，所以一般要配合回调函数，等Broker通知生产者消息发送成功；或者使用同步发送的方法发送消息；
* 每一个消费者在消费消息时，在Broker上有一个位移标识，表示当前消费到 topic分区的哪个位置，所以消费者在消费时要保证先消费消息，再更新位移标识，避免更新标识后消息消费被中断，导致消息丢失，so一般最好手动提交位移更新，不开启自动；

#### 连接管理

生产者：

* 建立TCP连接：Kafka Java SDK中，生产者会在启动时，读取`bootstrap.servers`得到Broker的地址，然后与这些地址上的Broker建立TCP连接，通常`bootstrap.servers`不需要配置太多的Broker地址，3\~4个足以，一旦生产者连接到集群中其中一台Broker，就能拿到整个集群的Broker信息，此时再重新建立TCP连接，此外，生产者也会每5分钟获取集群的metadata来更新Broker信息，创建新的TCP连接。
* 关闭TCP连接：用户主动关闭；Kafka默认设定，在9分钟内如果没有任何请求经过该TCP连接，就会主动关闭；

消费者：

* 建立TCP连接：消费者poll拿消息的时候，消费者首次启动后调用poll方法，会与任一Broker建立连接，获得它的协调者对应的Broker，再与该Broker建立连接。此时消费者就知道之后消息获取要通过哪个Broker了，当消费的连接断开后，下次会直接向该Broker发起连接。

  总共会建立3种连接：1. 确定协调者和获取集群元数据；2. 连接协调者，使其可以进行管理操作；3. 拉取消息进行消费，之后基本上就只有2和3两种TCP连接了
* 关闭TCP连接：用户主动关闭；Kafka默认设定，在9分钟内如果没有任何请求经过该TCP连接，就会主动关闭；

无论是生产者的PRODUCE请求还是消费者FETCH请求发送到Broker，Broker都会采用Reactor模型处理请求。默认情况下，`num.network.threads = 3`，即每台Broker启动时会创建3个网络线程处理客户端的请求，当网络线程拿到请求后，将请求放到一个共享请求队列中，接下来，Broker有个IO线程池，专门从共享请求队列中取出请求，执行真正的处理，默认的IO线程数是`num.io.threads = 8`，IO线程处理完成后，将生成的响应发送到网络线程池的响应队列中，由对应的网络线程将Response返回给客户端，即网络线程只处理请求的接收和响应，真正处理逻辑在IO线程。

所有网络线程共享同一个请求队列，但每个网络线程拥有自己的请求响应队列。

Purgatory组件用于缓存延时请求，比如设置了`acks=all`的生产者请求，此时请求必须等待ISR中所有副本都接收到了消息后才能返回，此时处理该请求的IO线程就必须等待其他Broker的写入结果，因为要等待结果，请求不能立即被处理，就会暂存在Purgatory中，直到获取到结果，才将响应放到对应的网络线程的响应队列中。

生产者的PRODUCE请求和消费者的FETCH请求，都属于数据类请求，LeaderAndIsr、StopReplica请求称为控制类请求，控制类请求可以直接令数据类请求失效，优先级较高，Kafka内部使用两套网络线程池和IO线程池处理这两类请求。之所以要这样做，是为了解决当有消息积压在Broker中，但此时发生副本切换或删除主题时，导致请求不能被立马感知，只能等到超时的问题。

![](https://github.com/Nixum/Java-Note/raw/master/picture/Kafka%E5%A4%84%E7%90%86%E8%AF%B7%E6%B1%82.png)

#### 实现 精确一次 QoS

* 幂等性：生产者在发送消息时，会多带一些信息给Broker，使得Broker可以通过这些信息对重复性消息去重；

  生产者的幂等性只能保证某一主题下的一个分区、单会话上不会出现重复性消息，无法实现多个分区、跨会话幂等，多分区或跨会话幂等只能依赖事务；
* 事务性：Kafka支持提交读隔离级别的事务，保证多条消息原子性写入目标分区，且保证消费者只能看到事务提交成功的消息；多条消息在一个事务内被提交到Broker，要么全部成功，要么全部失败，就算写入失败，Broker也会把它们写入底层的日志中，消费者可以选择读或者不读这些消息，读取是隔离级别的未提交读，不读取则是提交读；

#### 消费组

* 是一种可扩展且具有容错性的消费机制，一个消费组内有多个消费者或实例，共享一个公共的Group ID，组内的所有消费者协调在一起消费某个或多个主题下所有分区，每个分区可以被多个消费组消费，但只能由同一个消费组内的一个消费者来消费；
* 如果所有消费者都属于同一个消费组，那就是点对点模型，如果所有消费者被分散在不同的消费组，则是发布/订阅模式；
* 一般情况下，消费者实例的数量等于该消费组订阅主题的分区总数，如果大于分区总数，多余的实例并不会被分配任何分区，会永远空闲，比较浪费；
* Broker会保存每个消费组消费分区的位移，老版本中，每个消费组的位移是保存在ZooKeeper中，但是由于位移的更新比较频繁，而ZooKeeper是个CP要保存数据一致性，并不适合频繁更新，会有性能问题，所以新版本分区位移是保存在Broker的主题中；

#### 重平衡Rebalance

是一种协议，会对一个主题下的所有分区平均分给一个组里的所有消费者，由Broker通过一定的分配规则进行协调，**为消费组内的每个消费者分配该主题下要消费的分区**。

重平衡触发条件：

* 组内消费者数量变更，如组成员的加入、离开或者组成员崩溃；
* 主题的数量变更；
* 主题内分区数量变更；

重平衡时，所有消费者会停止消费，类似STW，直到重平衡结束，因为重平衡后，消费者可能需要重新与Broker建立TCP连接，会比较耗时间和资源；

重平衡由协调者Coordinator管理，它存在于Broker中，消费者提交位移时，是向协调者所在的Broker提交位移，然后由协调者负责执行消费组的注册、成员管理等元数据的操作；

消费组通过位移主题，确定协调者在哪个Broker上，计算方法：通过对GroupId做哈希计算，对位移主题分区数取模，得到分区号，在找到该分区副本中的Leader，该Broker就是协调者了。

每个消费者默认情况下每10s会发送心跳给协调者，表明存活，如果协调者收不到，就会认定该消费组已挂，触发重平衡；另外，默认情况下下，如果消费者在5分钟内无法拉取消息并处理完，也会被认定已挂，触发重平衡；

一般情况下，心跳发送时间间隔设置为`heartbeat.interval.ms = 2s`，会话超时时间为`session.timeout.ms = 6s`，即需要3次心跳都没被接收到，就算消费者挂掉；`max.poll.interval.ms` 表明消费者处理一批消息的最迟时间；

重平衡过程中，通过消费者的心跳线程通知其他消费者，Broker通过在心跳中设置`REBALANCE_IN_PROCESS`，响应给消费者，通知消费者要开始重平衡了。

![](https://github.com/Nixum/Java-Note/raw/master/picture/Kakfa%E9%87%8D%E5%B9%B3%E8%A1%A1%E7%8A%B6%E6%80%81%E6%B5%81%E8%BD%AC.png)

对于消费者来说，重平衡分为两个步骤：1. 加入组，JoinGroup请求； 2. 等待领导者消费者分配方案，SyncGroup请求；

1. 当组成员加入组时，会向协调者发送JoinGroup请求，在该请求中，每个成员都将自己订阅的主题上报，协调者收集所有成员的订阅信息，收集完成后，选出领导者消费者，并把组成员信息以及订阅信息发送给Leader；

   一般情况下，第一个发送JoinGroup请求的成员自动成为领导者，之后由领导者制定分区分配方案；
2. 领导者向协调者发送SyncGroup请求，领导者将 1 中制定的分配方案发送给协调者，其他成员也会向协调者发送SyncGroup请求，只是请求中没有实际内容，这一步主要是让协调者接收分配方案后，统一以SyncGroup响应的方式分发给所有成员们，使得组内所有成员都知道自己该消费哪些分区。

#### 位移主题

* 为了解决老版本**消费组消费位移管理依赖**ZooKeeper导致性能不高的问题，新版本的位移管理方案是通过`__consumer_offsets`这个主题来实现的，以达到支持高持久性和高频写操作的目的。

  该主题只是Kafka一个普通主题，同样可以对它进行操作，只是消息的格式由Kafka定义，用户无法修改，无法随意向这个主题写消息，属于内部主题，一般不需要去管理它。
* 消息体类型分为3种：
  * 一般位移消息：消息体格式是Key-Value形式的，key为`<GroupId，主题名，分区号>`，消息体除了保存位移值，还保存时间戳、用户自定义数据等元数据供Kafka做其他操作
  * 用于保存Consumer Group信息的消息
  * 用于删除Group过期位移或者删除Group的消息，一旦某个消费组下的所有消费者都停止，且它们的位移数据都已经被删除时，就会向位移主题写入这条消息，表明要删除这个消费组的信息
* 创建时机：默认情况下，第一个消费者启动时，会自动创建位移主题，默认配置是50个分区，3个副本，也可以手动通过API创建
* 位移提交分为自动提交和手动提交，类似ACK
* Broker会在后台启动一个线程，使用Compact策略来定时删除位移主题中的过期消息，避免磁盘被占满，判断的依据是消息的时间

#### 消费组消费位移

* 消费位移记录的是下一条消息的位移，而不是当前最新消费的位移，消费者会为分配给他的每个分区提交自己的位移数据，通过消费位移来表明消费者的消费速度和记录，当消费者故障重启后，还能继续消费，避免重复。
* 消费者处理消息时是一批一批处理的，处理完成后更新这一批消息的位移，如果这一批消息量太大，也可以选择处理一定数量后就更新位移，避免一次失败后要重新全部消费；
* 默认开启自动提交，设置自动提交位移时，是每5s自动提交一次，可能出现重复消费；

  同步手动提交时，虽然比较精准，但是提交时是同步的，会阻塞等待Broker的返回，此时消费者无法继续消费；

  异步手动提交，需要注册回调函数，处理Broker返回的结果，但也因为异步，无法进行重试；

#### 副本

* Kafka每个主题划分成若干分区，**每个分区可以有多个副本，副本仅提供数据冗余，提高消息存储的安全性，提高容灾能力，同一个分区的多个副本只允许有一个进行读写**。
* 副本本质上是一个只能追加写消息的提交日志，同一个分区下的所有副本保存相同的消息序列，副本分散在不同的Broker上，从而分摊部分Broker宕机带来的数据不可用问题，所以，**每台Broker都可能保存有各个主题下不同分区的不同副本**，单个Broker可以保存成百上千个副本。
* Kafka通过基于领导者的副本机制，保证所有副本的数据一致性：
  * 每个分区在创建时要选举一个副本，作为Leader，其他副本作为Follower；
  * Follower副本不对外提供服务，即不响应生产者和消费者的任何读写请求，所有读写请求都由Leader副本处理，即Leader副本所在的Broker，Follower副本唯一的任务是从Leader副本中异步拉取消息，写入到自己的提交日志中，实现日志同步；
  * 如果Leader副本挂掉，即对应的Broker宕机，Kafka依赖ZooKeeper提供的Watch监控功能实时感知，开启新Leader的选举，从Follower副本中选出一个Leader，老Leader回归后，降级为Follower副本加入；
* 这种副本机制的好处：
  * 确保生产者写入的消息，消费者能立即读到，因为如果是读Follower副本，可能存在延迟；
  * 确保消费者读取时，不会因为负载均衡之类的原因，导致一会能读到消息一会读不到；
* **Leader副本和Follower副本组成的集合叫ISR**，ISR由分区Leader维护，Kafka确保Follower副本的进度跟Leader副本的差距，不看两者之间相差多少消息，而是看Follower副本在复制消息时可被允许的最大延迟时间：`replica.lag.time.max.ms`，只要超过这个数，就说明Follower同步落后了，如果落后太多，这个Follower副本就会被踢出ISR，如果后面进度追上了，才会加回ISR；ISR中至少有一个副本，且该副本为Leader；

  **被踢出ISR的分区副本组成的集合叫OSR**，新加入的Follower也会存入OSR列表，ISR列表 + OSR列表 = AR（assigned ）。

  **当ISR为空时，说明Leader副本也挂掉了，此时需要选举**，Kafka把所有不在ISR中的存活副本称为非同步副本，通过`unclean.leader.election.enable`控制新Leader是否要从这些非同步副本中选举，如果选举，由于它们进度落后前Leader，是会造成丢数据的问题，这里就涉及到C和A的选择了，一般是不开启它，**C比较重要**，所以一般选择同步进度最高的Follower作为Leader；
* 副本中的**高水位**（Hight Water）：表示消息的位置，即位移，取值来自分区中对应的ISR列表中最小的LEO作为高水位，消费者最多只能消费到高水位所在位置的上一条信息。

  当消息位移小于高水位，属于已提交的消息，消费者只能消费已提交的消息，当消息位移大于等于高水位，属于未提交的消息，即处于已提交的消息和未提交的消息的边界就是高水位。**高水位主要是定义消息的可见性，用来标识分区下哪些消息可以被消费**，帮助Kafka完成副本同步。**提交指的是消息成功写入到Leader副本。**
* 副本中的**LEO**（Log End Offset）：**表示副本写入的日志文件中下一条消息的位移值**，同一副本下，其高水位的值不会大于LEO的值。每一个副本都有对应的高水位和LEO的值，\*\*分区的高水位就是其Leader副本的高水位，\*\*Leader副本所在的Broker，还保存了其他Follower副本的LEO值。更新时，Kafka会更新所有副本的高水位和LEO值，同时也会更新Leader副本上所有其他Follower副本的LEO值，但不更新其高水位，目的是帮助Leader副本确定其高水位。
* LSO（Last Stable Offset）：对未完成的事务而言，LSO的值等于事务中第一条消息的位置，对于已完成的事务而言，它的值与高水位相同。

**总结**：消息发送给分区的Leader后，会同步给Follower，消费者消费时，只能根据ISR列表，查看哪些消息在高水位上，才能被读取，**高水位取决于ISR列表里偏移量最小的分区**，类似木桶原理，从而保证一致性，否则，如果消息没有同步到多个Follower，Leader就宕机了，如果有落后的Follower被选举为Leader，又被消费者消费了，就破坏一致性了；引入高水位机制，如果Broker间消息的复制慢了，那么消息到达消费者的时间就会随之变长。

**副本同步机制**

流程总结：1. 生产者发送消息；2. Zookeeper找到分区Leader；3. 推送消息；4. 通过ISR列表通知给Follower；5. Follower从Leader拉取消息，发送ack；6. Leader收到所有副本的ack，更新Offset，并向生产者发送ack，表示消息写入成功。

具体流程：

* 生产者发送消息给Broker时， Leader副本，处理生产者请求流程：
  1. 写入消息到本地磁盘
  2. 更新分区高水位值
     1. 获取Leader副本所在的Broker端保存的所有远程Follower副本LEO值，LEO-1、LEO-2
     2. 获取Leader副本高水位值：currentHW
     3. 更新当前高水位值：currentHW = max{currentHW， min（LEO-1、LEO-2 ...）}
* Leader副本，处理Follower副本拉取消息的请求，流程：
  1. 读取磁盘或页缓存中的消息数据
  2. 使用Follower副本发送请求中的位移值更新远程Follower副本LEO值
  3. 更新分区高水位，流程同上
* Follower副本，从Leader副本拉取消息后的流程：
  1. 写入消息到本地磁盘
  2. 更新LEO值
  3. 更新高水位值
     1. 获取Leader发送的高水位值：currentHW
     2. 获取步骤2中更新过的LEO值：currentLEO
     3. 更新高水位为：min（currentHW，currentLEO）

![](https://github.com/Nixum/Java-Note/raw/master/picture/Kafka%E5%88%86%E5%8C%BA%E5%90%8C%E6%AD%A5%E6%9C%BA%E5%88%B6%E6%B5%81%E7%A8%8B.png)

可以看到，Follower副本拉取了两次，一次是更新LEO的值，一次是更新高水位，Leader副本高水位的更新和Follower副本高水位的更新在时间上是错配的。在`min.insync.replicas=1`的情况下，当Follower副本更新高水位时，如果此时先是Follower副本宕机，重启后会根据高水位进行日志截断，造成同步进度丢失，此时Leader也宕机，会进行Leader选举，之前的Follower可能会为Leader，就会出现数据不一致的情况，因此引入Leader Epoch机制解决。

Leader Epoch机制，实际上就是Leader版本号+起始位移值，Leader版本号在Leader每次重启后都会增加，Leader副本每次将消息写入磁盘后，都会更新起始位移值，并保存在checkpoint文件中。当有Follower宕机或重启时，从请求Leader副本获取Leader Epoch的数据进行保存，避免之后如果Leader宕机后，选出的的新Leader丢失同步进度，避免数据不一致。

#### 控制器

Kafka的控制器依赖ZooKeeperd的临时节点机制和Watch机制，实现对整个集群的管理和协调，集群中只能有一个Broker能成为控制器，第一个在ZooKeeper中成功创建controller节点的Broker会被指定为控制器。虽然控制器只有一个，但是依赖ZooKeeper的Watch机制，当运行中的控制器突然宕机或终止时，Kafka能快速感知并启动备用控制器进行替换。

控制器内部也是通过事件驱动 + 状态机的方式，去处理状态，解决多线程并发访问问题。

控制器的职责：

* 主题管理 - 创建、删除、增加分区；
* 分区重分配；
* Preferred领导者选举，主要是为了避免部分Broker负载过重而提高的一种更换Leader的方案；
* 集群成员管理 - 新增Broker、Broker主动关闭、Broker宕机；
* 保存了最全的集群元数据，向其他Broker提供数据服务，其他Broker会定期接收控制器发送过来的元数据更新请求，更新内存中的缓存数据。比较重要的元数据：主题信息、分区信息、所有Broker的信息、当前正在进行Preferred领导者选择的分区列表、分区重分配的分区列表；

#### Kafka中的时间轮

通过DelayQueue（优先级队列实现，队列里的每个元素，都是某一个具体时间的list） + 环形数组（数组的每个元素是个list，索引代表时间格）

DelayQueue会根据环形数组中的每个元素进行排序；

添加任务时，判断任务执行时间，加入环形数组中，对应的环形数组的元素（list），加入DelayQueue中。

![](https://github.com/Nixum/Java-Note/raw/master/picture/Kafka%E6%97%B6%E9%97%B4%E8%BD%AE%E7%AE%80%E5%8D%95%E5%AE%9E%E7%8E%B0.png)

## 消息模型

1.点对点：队列、监听器

2.发布订阅：监听器、监听器、观察者模式

### 一般消息队列架构

* 主要是设计中间的消息转发，将一次RPC转化成两次RPC；
* 选择通信协议；
* 消息的可靠性确认；
* 消息持久化；
* 消息模型；
* 事务特性；
* 分布式集群特性；
* 消息队列一般有两种模式，pull模式（消费方主动向队列拉取数据），比如kafka，push模式（由队列向消费方推送数据）

![消息队列架构](https://github.com/Nixum/Java-Note/raw/master/picture/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E6%9E%B6%E6%9E%84.png)

一般来说，一个主题可以分配给多个Broker，每个Broker可以有多个队列；

* 多个生产者对于同一个主题的消息，可以发送到不同的队列（轮询发，随机挑一个发，只往某个一个发）；
* 消费组订阅的是主题，消费主题里的所有队列的消息，消费消息时，可以只是去读队列里的消息，不一定会删掉，但是一般会设置消息可见性，组里的消费者读取队列里的消息时，其他消费者对此消息不可见；
* 多个消费组在消费同一主题时，消费组之间互不影响，即消息可以同时被多个消费组消费；
* 消费组内部可以包含多个消费者，同一消费组内，每个队列只能被一个消费者占用，一个消息只会被组内一个消费者消费；
* 每个消费组内部维护自己的消费位置，记录消费某一队列中消息的位置，消费位置与消费者无关

### 关于消息的序列化和反序列化

除了常见的json、XML、protobuf、Kryo、Hession等方案，还能自主设计

客户端和服务端维护一份字段顺序，在序列化时只存对应对象的值，从而减少序列化对象的长度，比如

```
03   | 08 7a 68 61 6e 67 73 61 6e | 17 | 01
User |    z h a n g s a n         | 23 | true
03表示对象类型，08表示名字的长度，然后字段值以名字、年龄、是否yi'wei
```

### 关于消息的断句

数据传输过程中，收到的分段不一定是发出去的分段，因此需要合理的分段，来让数据语义明确。比如在每个分段前加数据的长度

```
03下雨天 03留客天 02天留 03我不留
```

### 关于消息的存储

消息队列的存储一般比较简单，每个主题包含若干分区，每个分区(队列)都是WAL，写入时尾部追加，不允许修改，读取的时候，根据一个索引序号进行查询，连续顺序往下读。

#### Kafka

以分区为单位，每个分区包含一组消息文件和一组索引文件，消息文件和索引文件一一对应，具有相同的文件名，但文件扩展名不一样，文件名就是这个文件中第一条消息的索引序号；每个索引中保存索引序号（表示这条消息是这个分区中的第几条消息），和 对应的消息在消息文件中的绝对位置。

以分区为单位，粒度更细，比较灵活，容易进行数据迁移和扩容。

Kafka为了节省存储空间，采用稀疏索引，每隔几条消息创建一条索引。

写入消息时，在文件尾部追加写入，一个文件满了再写下一个文件；

查找消息时，先根据文件名找到所在索引的索引文件，然后二分法遍历索引文件内的索引，找到离目标最近的索引，再去消息文件中找到这条索引对应的位置，顺序遍历消息文件，找到目标消息。

#### RocketMQ

以Broker为单位，每个Broker只包含一组消息文件和索引文件，在该Broker上的所有主题的消息都存放在这一组消息文件中，索引文件则按照主题和队列分别建立，每个队列对应一组索引文件。

以Broker为单位，粒度较粗，但在写入时文件更少，有更好的批量和顺序写入。

索引采用定长稠密索引，为每条消息都创建索引，每个索引的长度固定20个字节。

写入消息时，Broker上所有主题、所有队列的消息都按照自然顺序追加写入同一个消息文件中，一个文件满了再写下一个；查找消息时，直接根据队列的消息序号，计算出索引的全局位置（索引序号 乘 固定长度20），直接读取索引，再根据索引在消息文件中找到对应的消息，查找速度比Kafka快，但耗存储。

### 消息队列服务质量QoS

* at-most-once：最多一次，消息可能丢失，不会重复发送，即消息要么被发送一次，要么不会被发送；
  * 生产者完全依赖TCP/IP的传输可靠性，发送一次就结束。kafka中设置`acks=0`打开此模式；
  * 消费者消费时先保存消息，再进行处理。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔；
* at-least-once：最少一次，消息绝不会丢，但可能重复发送，解决上一个级别消息发送失败的问题；一般使用此等级。
  * 生产者发送消息时要做防丢保证，检查发送结果，并在失败或超时时重新发送消息，直到broke确认收到，so可能会重复发送。kafka中设置`acks=1或all`并设置`retries>0`打开此模式；
  * 消费者先处理消息，再保存消费进度，处理消息时幂等。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔，或者直接关闭自动提交偏移量，处理消息后手动调用同步模式的偏移量提交。
* exactly-once：正好一次，每条消息有且只有一次被传输；
  * 生产者发送消息要保证防丢， 可能需要借助外部组件实现只发送一次。
  * 消费者处理消息时要幂等，或者对接收的消息进行去重。

这三个级别的可靠性依次增加，但相应的也会增加延迟和带宽占用。

### 案例一 - 异步消息的处理

![](https://github.com/Nixum/Java-Note/raw/master/picture/JD_JMQ%E6%B6%88%E6%81%AF%E6%8E%A5%E6%94%B6%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9E%8B.png)

> 1. 收到请求后，我们在Handler中不做过多的处理，执行必要的检查后，将请求放到一个内存队列中，也就是图中的RequestsQueue。请求被放入队列后，Handler的方法就结束了。可以看到，在Handler中只是把请求放到了队列中，没有太多的业务逻辑，这个执行过程是非常快的，所以即使是处理海量的请求，也不会过多的占用IO线程。
> 2. 由于要保证消息的有序性，整个流程的大部分过程是不能并发的，只能单线程执行。所以，接下来我们使用一个线程WriteThread从请求队列中按照顺序来获取请求，依次进行解析请求等其他的处理逻辑，最后将消息序列化并写入存储。序列化后的消息会被写入到一个内存缓存中，就是图中的JournalCache，等待后续的处理。
> 3. 执行到这里，一条一条的消息已经被转换成一个连续的字节流，每一条消息都在这个字节流中有一个全局唯一起止位置，也就是这条消息的Offset。后续的处理就不用关心字节流中的内容了，只要确保这个字节流能快速正确的被保存和复制就可以了。
> 4. 这里面还有一个工作需要完成，就是给生产者回响应，但在这一步，消息既没有落盘，也没有完成复制，还不能给客户端返回响应，所以我们把待返回的响应按照顺序放到一个内存的链表PendingCallbacks中，并记录每个请求中的消息对应的Offset。
> 5. 然后，我们有2个线程，FlushThread和ReplicationThread，这两个线程是并行执行的，分别负责批量异步进行刷盘和复制，刷盘和复制又分别是2个比较复杂的流程。刷盘线程不停地将新写入JournalCache的字节流写到磁盘上，完成一批数据的刷盘，它就会更新一个刷盘位置的内存变量，确保这个刷盘位置之前数据都已经安全的写入磁盘中。复制线程的逻辑也是类似的，同样维护了一个复制位置的内存变量。
> 6. 最后，我们设计了一组专门用于发送响应的线程ReponseThreads，在刷盘位置或者复制位置更新后，去检查待返回的响应链表PendingCallbacks，根据QOS级别的设置（因为不同QOS基本对发送成功的定义不一样，有的设置需要消息写入磁盘才算成功，有的需要复制完成才算成功），将刷盘位置或者复制位置之前所有响应，以及已经超时的响应，利用这组线程ReponseThreads异步并行的发送给各个客户端。
> 7. 这样就完成了消息生产这个流程。整个流程中，除了JournalCache的加载和卸载需要对文件加锁以外，没有用到其他的锁。每个小流程都不会等待其他流程的共享资源，也就不用互相等待资源（没有数据需要处理时等待上游流程提供数据的情况除外），并且只要有数据就能第一时间处理。
> 8. 这个流程中，最核心的部分在于WriteThread执行处理的这个步骤，对每条消息进行处理的这些业务逻辑，都只能在WriteThread中单线程执行，虽然这里面干了很多的事儿，但是我们确保这些逻辑中，没有缓慢的磁盘和网络IO，也没有使用任何的锁来等待资源，全部都是内存操作，这样即使单线程可以非常快速地执行所有的业务逻辑。
>
> 这个里面有很重要的几点优化：
>
> 第一是使用异步设计，把刷盘和复制这两部分比较慢的操作从这个流程中分离出去异步执行；
>
> 第二是使用了一个写缓存JournalCache将一个写磁盘的操作，转换成了一个写内存的操作，来提升数据写入的性能；
>
> 第三是这个处理的全流程是近乎无锁的设计，避免了线程因为等待锁导致的阻塞；
>
> 第四是把回复响应这个需要等待资源的操作，也异步放到其他的线程中去执行。

### 案例二 - Kafka高性能IO

* Kafka的SDK，生产者调用SDK发送消息时，看起来只发了一条，但是SDK处理时不是一条一条发的，而是先把消息放到内存中缓存起来，攒一批，异步一块发给broker，broker会把这一批消息当成一条处理，消费者pull消息，也是一次性拉取这一批消息进行处理，从而减少Broker处理请求的次数，减轻压力。

  Kafka有一个参数可以让消费者阻塞直到有新消息到达 或者 消息堆积到一定数量之后才接收 ，避免Broker没有消息时，消费者不断轮询。
* 顺序读写，Broker收到消息后，顺序写入对应的log文件，一个文件写满就开启新文件顺序写下去，消费时也是从log文件的某一位置开始，顺序的读出来。
* 利用PageCache加速读写，数据写入文件时，先写入PageCache，再一批一批写入磁盘，读取文件时，也是先从PageCache中读取数据，当PageCache中没有数据，会引发os的缺页中断，读取先从会被阻塞，直到数据从文件中复制到PageCache，再从PageCache中读取数据。
* 零拷贝提升消费性能：一般来说，Broker处理消费时，会先从文件复制数据到PageCache，从PageCache复制数据到应用内存，从应用内存复制到Socket的缓冲区，发送数据。如果文件中的数据无需其他处理，可以使用零拷贝，直接将数据从PageCache复制到socket缓冲区，减少复制次数

## 常见问题

以下问题都是需要分具体的MQ的，这里简单说下通用方法。一般异常会出现的场景：

* 生产者：网络或内部问题导致消息发送失败，没发出去；或者是发出去了，但是没有接受成功响应导致重复发送。
* 消费者：消费者获取了消息，处理消息的过程中宕机，恢复时消息如何处理。
* 消息队列：宕机重启时会丢数据

### 如何利用事务消息实现分布式事务

以RocketMQ为例，生产者先发送一个半消息给MQ，此时的半消息对消费者不可见，完成之后执行本地事务，根据本地事务的执行结果，对刚刚发送的半消息进行commit或rollback，只有commit之后，消费者才能消费此消息。如果生产者本地事务提交成功后宕机，MQ会调用生产者提供的反查方法确认半消息的之后的执行状态，反查方法会有三个结果：提交、回滚、不确定，如果查询结果是不确定，MQ则会进行重试，直到查到确定的结果或者超重试次数。这种场景其实是默认消费者一定会消费成功的，如果要让消费者消费不成功时，生产者也回滚，那就不能用这种方案了。

### 如何保证高可用性

集群 + zookeeper + 负载均衡

以ActiveMQ为例（因为是主从架构）

使用ZooKeeper（集群）注册所有的ActiveMQ Broker。只有其中的一个Broker可以提供服务，被视为 Master，其他的 Broker 处于待机状态，被视为Slave。如果Master因故障而不能提供服务，Zookeeper会从Slave中选举出一个Broker充当Master。 Slave连接Master并同步他们的存储状态，Slave不接受客户端连接。所有的存储操作都将被复制到 连接至 Master的Slaves。如果Master挂了，得到了最新更新的Slave会成为 Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式

消息数据可以持久化或非持久化，在集群内共享，当有节点挂掉后，其他节点也可以通过这些共享的数据顶上。另外，也可通过定时任务定时对失败的消息进行补偿。

数据的持久化存储可以存储在文件系统 > 分布式KV > 分布式文件系统 > 数据库（速度上的排列，但可靠性就要反过来了）

### 如何解决消息重复问题，保证消息幂等性

消息幂等，可以保证即使消息被重复消费也无所谓，一般来讲，重复发送总是存在的，要避免的是即使重复消费也能保证业务正确。要解决消息重复发送，除非允许消息丢失，但很明显，消息不丢的优先级肯定更高。

消息重复消费需要解决3个问题：1.第一条消息已经消费成功了，第二条消息将直接成功；2.并发场景下，消息重复，击穿幂等挡板的问题；3.生产者发送重复的消息：重复的消息id一致或者不一致

**常见实现幂等性方法**：

保证消息幂等一般是在消费者做，比如业务操作本身就是幂等的，不然就是在消息本身上做文章：

* 消费者处理消息的时候将消息入库记录，利用数据库的唯一键约束 + 事务来实现，比如事务开启时，先把消息入库，再进行其他业务操作，然后再提交事务，保证消息只被消费一次；

  这种方案有局限性，一个它需要依赖关系型数据库，另一个是需要所有的操作都在同一个库里，保证在同一个库内，不允许跨库操作。
* 基于上面那种方案，如果涉及到跨服务、库的调用，则可以将跨服务的操作也抽成一个消息，通过将对不同服务或库的操作也改成事务+消息的方式来做，就可以解决这个问题，但是会导致增加业务的复杂度。
* 消息入库 + 消息TTL去重表 + 消息状态的方案：消费者接收到消息，插入消息去重表，并设置过期时间，如果插入成功，此时消息的状态为消费中，执行业务代码，成功后更新消息状态为已消费，然后发送ack，如果业务代码执行失败，则将消息从消息表删除，重新投递；如果消息一开始就插入失败了，判断消息的状态，如果消息状态是已消费，则返回ack，如果状态是消费中，则重新投递，之所以要多这一步是为了解决并发时候重复消息的问题；插入时设置过期时间，是为了解决消息重复投递时，每次都是看到消息是消费中的状态，导致消息最终去了死信的问题；

  当然此方案中如果涉及到跨服务、库的调用，需要保证这一次调用也是幂等的，该方案仅仅是解决上面的方案中依赖关系型数据库和事务的问题。
* 更新数据时设置前置条件，如果满足该条件则更新数据，否则拒绝更新数据，在更新数据的同时，变更前置条件中需要判断的数据，这样当重复执行这个操作时，由于第一次更新已经修改了前置条件，不满足前置条件时，则不会重复执行更新数据的操作，比如给数据增加一个版本号，每次更新数据前，比较当前数据的版本号是否和消息中的版本号一致，如果不一致就拒绝更新数据，更新数据的同时，版本号+1，实现幂等更新。
* 在发送消息时，给消息指定一个全局唯一的ID，消费时，先根据这个ID检查这条消息是否被消费过，如果没消费过，才更新数据，然后标记消息为已消费。但该方案在分布式系统中比较复杂，高可用，高性能都有一定的影响，需要使用分布式锁或者分布式事务
* 对于无法处理的消息导致MQ重复发送，可以设置重复次数，过了重复次数将消息持久化到其他地方，比如死信队列，以后处理。

### 如何保证消息顺序

100%的顺序消息的条件比较苛刻，需要允许消息丢失且生产者到消费者都是单点单线程，一般是保证在不丢消息的前提下，尽量减少重复消息，不保证投递顺序。。

允许重复消息时，可以通过**序列号**或者**状态机**来解决消息幂等处理和消息顺序的问题

* **序列号**：消费者接收消息时，只接收比最新序列号大的消息，重复消息因为序列号比当前序列号小，所以可以抛弃，比如消息顺序是123，消费者当前也收到123了，再收到123其中一个就可以不进行处理了；如果消息顺序不一致，比如消息顺序是123，但先接收到的是3，1和2还没收到，那就只能先把3存起来，等待12的到来，具体可以参考TCP协议的通信机制。
* **状态机**：将业务流程设定成一系列的状态扭转，不同的状态只能处理不同的消息，就可以依靠状态流的扭转来实现顺序消息

**减少重复消息的处理**

消息队列收到消费者的确认信号后，将消息id清除或进行标记；

发送方要进行重发前对消费者进行询问请求

### 如何保证消息的可靠性传输

消息传输有3个阶段：

1. 消息的传输包括生产者发送消息给消息队列
2. 消息在Broker端存储，如果是集群，消息会复制到其他副本
3. 消费者通过pull模式拉取数据 或者 消息队列通过push模式向消费者发送消息

保证消息可靠，一般是使用 **持久化机制 或 事务 + ack机制**

持久化不一定带有事务特性，比如直接日志落地，如果持久化要实现事务特性，可以使用分布式事务或者本地数据库事务。

消息的确认机制（ack机制），消息发送完成后，需要收到接收方的确认信号，确认信号的返回可以是收到后消息后就立即返回，比如默认auto ack机制，或者是接收方法接收到消息，处理完该消息后才发送确认信号。另外，当接收方无法处理消息，比如消费能力不够，网络不佳等情况，接收方也可以直接拒绝消息，等待发送方重新发送，所以这里就涉及到消息的重复发送了，通过直接拒绝消息来减少业务负担。

> ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言，只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK，可以在consumer（producer）与Broker之间建立一种简单的“担保”机制.

1. 消息发送阶段：生产者往Broker发送消息前，可以做一次消息持久化，收到Broker的ack响应后，要正确处理，以此保证消息在生产阶段不会丢失。在此阶段，Broker并不关心生产者是否收到ack响应，因为在生产者的角度，消息已经持久化后成功发出了，如果没有收到ack，最多就重复发送，那就会收到重复的ack，保证消息一定落到了Broker，当然前提是Broker能幂等处理。

   发送方只有在消息入库成功，事务提交后，才会发送消息，如果发送失败，可以靠定时任务重试。注意本地事务做的是业务落地和消息落地的事务。
2. 消息到达Broker阶段：对于单节点Broker，在收到消息后，将消息持久化到磁盘，再给生产者返回ack响应；对于集群Broker，在收到消息后，至少要把消息发送到两个以上节点，再给生产者返回ack响应。
3. 消息消费阶段：对于pull模式，消费者拉取消息后，先成功执行完业务逻辑后，才给Broker发送消费ack响应，如果Broker没有收到消费者的确认响应，下次拉取的仍然是同一条消息。

   支持广播的消息队列需要对每个待发送的endpoint，持久化一个发送状态，直到所有endpoint状态都OK才可删除消息。

   无论是pull模式还是push模式，在允许重复消息的情况下，还可通过定时任务轮询未消费消息发送给消费者处理来保证最终一致性。

保证消息从生产者到MQ或者MQ到消费者的过程在同一个会话中，保证原子性；在事务性会话中，当一个事务被提交的时候，确认自动发生；事务回滚，消息再次传送；一个事务提交才能进行下个事务，效率较差。

在非事务性会话中，消息何时被确认取决于创建会话时的应答模式ACK模式，分为自动确认(onMessage方法成功返回，如果抛异常会交由异常消息监听器，或者重复次数发送)、手动确认、不必须确认,批量(重复有标记)。

**检测消息丢失的方法**

通过消息队列的有序性来验证是否有消息丢失。生产者发出消息时附加一个连续递增的序号，由消费者来检查这个序号的连续性。在分布式系统上，在发送消息时必须指定分区，消费者在每个分区单独检查消息序号的连续性。

**消息的丢失处理**

生产者消息丢失处理：发送消息时产生一个id，MQ接收到消息后回传id，超过一定时间没收到则重发

MQ消息丢失处理：开启消息队列持久化 和 消息持久化，持久化后才回传id给生产者

消费者消息丢失处理：取消自动ack，在方法处理完之后调用方法，发送确认ack给MQ，如果消息处理的时间太长，但可能导致重复发送

### 关于pull模式和push模式

* push模式的弊端，如果消费者消费能力不够，就会导致消息在消息队列中堆积，消息队列也需要保存这些消息，记录这些消息的状态；而pull模式是消费者按能力消费，所以没有这样问题。
* push模式下要保证顺序消息也比较麻烦，需要等待消费者确认一个消息后才能发送下一个，吞吐量就不太行了
* pull模式的弊端，因为消费者拉取消息的时间间隔比较难把握，间隔时间不合理就会导致消息消费存在延迟和忙等，常见的作法是消费者建立连接后hold住一段时间，保存一个长连接，设置等待时间进行断开，在这段时间内进行消息消费

### 如何解决消息队列的延时以及过期失效问题？

手动查询丢失消息，重新导入

可以设置死信队列，过期或者重复多次为被消费的消息会进入死信队列

### 如何解决消息积压问题？

一般消息积压，要不就是发送快了，要不就是消费慢了，一般先查MQ监控，判断生产速度和消费速度是否异常。

* 生产者发送消息太慢也会导致积压，比如，生产者在一个事务内先发送半消息，处理业务逻辑，提交事务，事务成功后才会提交半消息，此时该消息才会被消费者所见，如果事务处理得太慢，也会造成消息堆积。

  so 生产者最好可以支持批量发送或者并行发送两种发送方式；对于实时性不强的业务，生产者可以积累一定量的消息才发送给MQ；对于批量发送的消息，消费时也要批量消费
* 修复消费者，检查消费者是不是对某一消息进行重复消费，恢复消费速度；
* 扩充原来的数量，消费之后再恢复原来架构。比如新键一个topic，建立比原先多n倍的队列，多n倍的消费者处理，每批消费者对应一个队列（分区），确保消费者的实例数和队列(分区)数相等。
* 如果可以持久化消息，也可以先丢弃消息，之后再将持久化消息导入队列再处理
* 服务降级，减少生产者的发送消息的数量

## 延迟队列

任意延迟消息有两个难点，一个是消息因为有不同的过期时间，且投递顺序不定，因此处理消息时需要排序，另一个是延迟消息的存储，因为如果MQ基于WAL方式实现，日志文件会被过期删除，其最多只能保存一段时间的数据。

1. 通过RabbitMQ的TTL消息 + 死信队列实现，消息的TTL过期之后，会被转发到死信，消费者直接消费死信队列里的消息即可。
2. 基于Kafka，由于Kafka本身不支持延迟消息，只能通过额外方案实现。

   **实现固定过期时间的延迟消息**：设置两个Topic，一个用于生产者投递设定好过期时间的消息，另一个是用于立即消费消息。生产者设定消息过期时间后，投递到第一个消息队列，消费者消费消息后判断是否需要立即执行，如果需要则把消息投递到第一个Topic，由真正的消费者执行消费。消费第一个队列的消费者如果判断出消息不能被投递到第二个队列，需要使用Kafka提供的 开始/暂停 API 来消费消息，不能使用sleep暂停消费消息，也不能重新投递（因为消息的顺序性），否则会导致Broker认为消费者不可用，触发 重平衡机制。

   缺点：消息只能是固定的延迟，如果每条消息的延迟时间不同，就得使用多种Topic和对应的消费者；延迟时间不宜过长，因为WAL机制，消息堆积可能会导致日志文件被删除

   ![](https://github.com/Nixum/Java-Note/raw/master/picture/Kafka%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97%E5%AE%9E%E7%8E%B0.png)
3. 多级时间轮优化排序；将普通消息和延迟消息的commitLog分开保存，以保存较长时间；延迟消息数据按固定延迟划分，分段加载，时间轮只加载临近到期的消息，比如每30分钟一个块写入文件，整点时计算下一个30分钟的消息hash到对应的时间轮上，而超过时间轮限制的，直接落盘。

   ![](https://github.com/Nixum/Java-Note/raw/master/picture/%E6%97%B6%E9%97%B4%E8%BD%AE%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97.png)

   > * 消息写入WAL
   > * Dispatcher处理延迟消息
   >   * 延迟消息一定时间的直接写入TimeWheel
   >   * 延迟超过一定时间写入DelayMessageStorage
   > * DelayMessageStorage对DelayMsgFile构建一层索引，这样在映射到TimeWheel时只需要做一次Hash操作
   > * 通过TimeWheel将消息投递到ConsumeQueue中完成对Consumer的可见

[几种延迟队列实现](https://zhuanlan.zhihu.com/p/266156267)

[如何在MQ中实现支持任意延迟的消息？](https://developer.aliyun.com/article/660438)

## 参考

[消息队列常见问题和解决方案](http://xuyangyang.club/articles/2018/07/23/1532348839398.html)

[如何从0到1设计一个MQ消息队列](http://youzhixueyuan.com/design-the-message-queue.html)

[ActiveMQ消息传送机制以及ACK机制详解](https://shift-alt-ctrl.iteye.com/blog/2020182)

[架构文摘：消息队列设计精要](https://blog.csdn.net/zhaobryant/article/details/80557103)

[消息队列设计精要](https://tech.meituan.com/2016/07/01/mq-design.html)，这个讲的不错

极客时间 - 消息队列高手课 - 李玥


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://nixum.gitbook.io/note/xiao-xi-dui-lie.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
