RabbitMQ,RocketMQ,Kafka的一些基础要点

队列模型和主题模型

早期的消息中间件是通过 队列 这一模型来实现的,可能是历史原因,我们都习惯把消息中间件成为消息队列。

但是,如今例如 RocketMQKafka 这些优秀的消息中间件不仅仅是通过一个 队列 来实现消息存储的。

就像我们理解队列一样,消息中间件的队列模型就真的只是一个队列。

image-20240620220918248

这种模型下,“广播”功能就不便实现了。也就是说如果我们此时我们需要将一个消息发送给多个消费者(比如此时我需要将信息发送给短信系统和邮件系统),这个时候单个队列即不能满足需求了。当然你可以让 Producer 生产消息放入多个队列中,然后每个队列去对应每一个消费者。问题是可以解决,创建多个队列并且复制多份消息是会很影响资源和性能的。而且,这样子就会导致生产者需要知道具体消费者个数然后去复制对应数量的消息队列,这就违背我们消息中间件的 解耦 这一原则。

主题模型(发布/订阅模型)就是为了解决这一问题而设计的。 //TODO⭐ 去了解观察者模式

在主题模型中,消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber) ,存放消息的容器称为 主题(Topic) 。发布者将消息发送到指定主题中,订阅者需要 提前订阅主题 才能接受特定主题的消息。

其实对于主题模型的实现来说每个消息中间件的底层设计都是不一样的,就比如 Kafka 中的 分区RocketMQ 中的 队列RabbitMQ 中的 Exchange 。我们可以理解为 主题模型/发布订阅模型 就是一个标准,那些中间件只不过照着这个标准去实现而已。

消息队列的作用

  1. 降低系统耦合性
    消息队列使用发布-订阅模式工作(只是工作模式之一),消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计

  2. 异步处理提高系统性能(减少响应所需时间)
    将用户请求中包含的耗时操作,通过消息队列实现异步处理,将对应的消息发送到消息队列之后就立即返回结果,减少响应时间,提高用户体验。随后,系统再对消息进行消费。因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。

  3. 削峰/限流
    先将短时间高并发产生的事务消息存储在消息队列中,将压力转嫁到MQ上,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击

image-20240528183034107

秒杀处理流程如下所述:​ 用户发起海量秒杀请求到秒杀业务处理系统。​ 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送 RocketMQ。​ 下游的通知系统订阅 RocketMQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。​ 用户收到秒杀成功的通知

  1. 顺序消息
    在很多应用场景中,处理数据的顺序至关重要。消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。例如 RockerMQ , 对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。顺序消息分为分区顺序消息全局顺序消息

  • 分区顺序消息:对于一个指定的 Topic ,所有消息根据 Sharding key 进行区块分区,同一分区内的消息按照严格的 FIFO 原则进行发布和消费,同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

    • 例如:用户注册需要发送验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
    • 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
  • 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。适用于对性能要求不高,所有消息按照 FIFO 的顺序进行发布和消费。

    全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。

  1. 分布式事务

  2. 延时/定时处理
    消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。

  3. 数据流处理
    针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。

消息队列带来的问题

  • 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!

  • 系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!

  • 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了。

  • 重复消费消息

  • 消息的消费顺序

  • 如何解决分布式事务

  • 消息堆积怎么办

image-20240528213638406

RabbitMQ

RabbitMQ架构如下

image-20240622154334050
  • Producer

  • Consumer

  • Exchange

    在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。

    Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

    生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效

    RabbitMQ 中通过 Binding(绑定)Exchange(交换器)Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。

    image-20240622155120664

    BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中

    Exchange Types

    • fanout:会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息

    • direct(默认):会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

    • topic

      direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

      • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;

      • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;

      • BindingKey 中可以存在两种特殊字符串“”和“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

      image-20240622160316614
    • headers(不推荐)

      headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

  • Queue

    多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。

  • Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。

  • 死信队列

    DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

    导致的死信的几种原因

    • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false

    • 消息 TTL 过期。

    • 队列满了,无法再添加。

  • 延迟队列:AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能

  • RabbitMQ的工作模式

    • 简单模式
    • work 工作模式
    • pub/sub 发布订阅模式
    • Routing 路由模式
    • Topic 主题模式
  • 如何保证消息的可靠性?

    消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。

    • 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。

    • RabbitMQ 自身:持久化、集群、普通模式、镜像模式。

    • RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。

  • 如何保证消息的顺序性?

    拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;

    或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

RocketMQ

分布式的应用必定涉及到多个系统之间的通信问题,可以说分布式的生产是消息队列的基础。

消息队列的作用:异步、解耦、削峰

组成

  • Producer、Consumer、Brocker、Topic、Queue、Message、NameServer

  • NameServer
    NameServer 是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。主要包括两个功能:

    • Brocker管理:NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据;然后提供心跳检测机制,检查 Broker 是否还存活;
    • 路由信息管理每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

    NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯,是去中心化的。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息

  • Brocker

    • Broker主要负责消息的存储投递查询以及服务高可用保证。

    • 一个 Topic 分布在多个 Broker上,一个 Broker 可以配置多个 Topic ,它们是多对多的关系

    • 如果某个 Topic 消息量很大,应该给它多配置几个队列(上文中提到了提高并发能力),并且 尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力

    • Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大。

    • NameServer 是去中心化无状态节点,一般集群部署,节点之间无任何信息同步。Broker 部署相对复杂。

    • Master-Slave 架构中,Broker 分为 Master 与 Slave。一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。如果 master 宕机,slave 提供消费服务,但是不能写入消息

    • 在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。

  • Topic 主题:代表一类消息,比如订单消息,物流消息等等。

  • Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。

    RocketMQ 服务端 5.x 版本开始,生产者是匿名的,无需管理生产者分组(ProducerGroup);对于历史版本服务端 3.x 和 4.x 版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。

  • Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。

​ 消费者分组中的订阅关系、投递顺序性、消费重试策略是一致的。

image-20240620225536982

RocketMQ的消息模型

RocketMQ 中的消息模型就是按照 主题模型 所实现的。

可以看到图中生产者组中的生产者会向主题发送消息,而 主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。

每个主题中都有多个队列(分布在不同的 Broker中,如果是集群的话,Broker又分布在不同的服务器中),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1Consumer2 分别对应着两个队列,而 Consumer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。当然也可以消费者个数小于队列个数,只不过不太建议。

image-20240621201519158

每个消费组在每个队列上维护一个消费位置

我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。

如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要),它仅仅是为每个消费者组维护一个 消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。

image-20240621201936624

为什么一个主题中需要维护多个队列

是为了提高并发量。一个Topic只包含一个队列也不是不行,只是并发量会下降很多。

如果每个主题中只存在一个队列,这个队列中也维护着每个消费者的消费位置,也可以做到发布订阅模式。

image-20240621202605477

但是,这种情况下,生产者只能向一个队列发送消息,又因为需要维护消费位置,所以一个队列只能对应一个消费者组中的消费者,这样一个消费者组中只有一个消费者在工作,并发量大大降低。所以总结来说,RocketMQ 通过使用在一个 Topic 中配置多个队列并且每个队列维护每个消费者组的消费位置 实现了 主题模式/发布订阅模式

总结:

  • 每个 BrokerNameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

  • ProducerNameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。

  • ConsumerNameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息

订阅关系一致性

订阅关系一致指的是同一个消费者分组Group ID下,所有Consumer实例所订阅的TopicTag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

  • 订阅的Topic必须一致,可以订阅多个 Topic,但订阅必须一致

    例如:Consumer1订阅TopicATopicBConsumer2也必须订阅TopicATopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicATopicC

  • 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序。

    例如:Consumer1订阅TopicBTagTag1||Tag2Consumer2订阅TopicBTag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1。

  • 订阅多个TopicTopic的类型一致。

    例如,Consumer1和Consumer2都同时订阅TopicA和TopicB,则这两个Topic的类型必须一致,必须都是普通消息或者都是顺序消息。云消息队列 RocketMQ 版支持的消息类型,请参见消息类型列表

消息的类型

普通消息

普通消息一般应用于微服务解耦事件驱动数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

普通消息的生命周期:

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时消息

在分布式定时调度触发任务超时处理等场景,需要实现精准、可靠的定时事件触发

定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致

定时消息的生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达,此时对下游消费者是不可见的。

  • 待消费定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。

  • 消费中

  • 消费提交

  • 消息删除

顺序消息

顺序消息仅支持使用 MessageType 为 FIFO 的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。和普通消息发送相比,**顺序消息发送必须要设置消息组。**要保证消息的顺序性需要单一生产者串行发送。

单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费。

事务消息

用于在分布式场景下保障消息生产和本地事务的最终一致性。简单来讲,就是将本地事务(数据库的 DML 操作)与发送消息合并在同一个事务中

  • 同步消息

  • 异步消息

  • 单向消息

  • 延时和定时消息

  • 事务消息

  • 批量消息

  • 顺序消息

  • 带标签的消息、消息过滤:不同的业务应该使用不同的Topic,如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分

  • 如何确保消息不丢失? ⭐ TODO

消费者的分类

PushComsumer

SimpleConsumer

PullComsumer

RocketMQ刷盘机制

RocketMQ 命名规范

image-20240611193823692

使用规范

消息发送
  • 【强制】定义生产者时,必须为其指定生产者组

  • 【强制】一个系统应对应一个 Topic ,系统下的不同业务根据 Tag 细分,参考申请规范-消费应用 Tag。

  • 【强制】发送消息时,需设置 KEYS。KEYS 建议定义为业务唯一标识,比如订单 ID。

  • 【强制】发送消息不管发送成功或失败,需打印 KEYSPayload、执行时间以及 SendResult

  • 【强制】发送消息时,需设置超时时间,避免应用被拖垮;建议超时时间设置为 2000ms 内。

  • 【建议】针对可靠性较高的消息,发送失败后可以存储到 DB,开启定时任务扫描,并重新投递。

消息消费
  • 【强制】消费端创建时,必须指定消费者组。

  • 【强制】消费端需要保证数据幂等

  • 【强制】消费消息不管成功或失败,需打印 KEYSMsgId、执行时间以及 Message

  • 【强制】不同的应用集群应使用不同的消费者组,如果不同的应用集群需要订阅同一消费者组,需保证 Topic Tag 订阅关系一致。

  • 【建议】消费时尽量不设置重试,大部分情况下,执行失败的消息重试后会再次失败,反而会影响消费进度。开发者应该针对特定场景在代码中设置重试逻辑。

  • 【建议】消费者并发消费数量默认为 1,即串行化,应该基于不同系统场景来设置并发数,同时要考虑消费过程中其它组件的压力。

    • 系统 CPU 任务少:CPU 核数 / (1 - 阻塞系数 0.8)
    • 系统 CPU 任务较多,建议 CPU 核数 + 1 即可。

面试题

发布订阅模型

如何解决消息重复消费问题

消息重复原因:发送时重复、消费时重复

可以通过给消费者实现 幂等特性,来保证消息不会被重复消费 ,也就是对同一个消息的处理结果,执行多少次都不变。

两个要素:幂等令牌、唯一性处理

如何解决消息的顺序消费问题?

首先分析一下为什么会出现消费顺序问题:

RocketMQ的队列选择法有:

轮询法

最小延迟投递法(producer.setSendLatencyFaultEnable(true);)

继承MessageQueueSelector 实现

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//从mqs中选择一个队列,可以根据msg特点选择
return null;
}
}, new Object());

生产者生产了一系列的有序消息(比如同一个订单的创建、支付、发货),消费者需要按照一定的顺序去消费消息。但是在发布订阅模型中,默认情况下生产者生产消息是按照轮询进行负载均衡,将多个消息轮流发送到不同队列的,这种情况下,因为消息分布在多个队列中,就难以维护对消息的顺序消费了。所以,要实现顺序消费,需要将同一语义的消息发送到同一队列中,可以使用Hash取模法来保证同一个订单的消息发送到同一个队列中。

如何解决分布式事务问题

基于消息的分布式事务 + 事务反查机制(用于查看本地事务的执行状态)+ 事务补偿机制(用于回滚)

就拿用户购票的分布式系统来说,用户购票完成之后是不是需要增加账户积分?在同一个系统中我们一般会使用事务来进行解决,如果用 Spring 的话我们在上面伪代码中加入 @Transactional 注解就好了。但是在不同系统中如何保证事务呢?总不能这个系统我扣钱成功了你那积分系统积分没加吧?或者说我这扣钱明明失败了,你那积分系统给我加了积分。

如今比较常见的分布式事务实现有 2PCTCC事务消息(half 消息机制)。

RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。

image-20240621215005426

如何解决消息堆积问题?

MQ的一个重要功能是削峰,但如果峰值过大,转移到了MQ中,导致消息堆积怎么办呢?

产生消息堆积的根源其实就只有两个——生产者生产太快或者消费者消费太慢。

当流量到峰值的时候是因为生产者生产太快,我们可以使用一些 限流降级 的方法,当然你也可以增加多个消费者实例去水平扩展增加消费能力来匹配生产的激增。如果消费者消费过慢的话,我们可以先检查 是否是消费者出现了大量的消费错误 ,或者打印一下日志查看是否是哪一个线程卡死,出现了锁资源不释放等等的问题。

当然,最快速解决消息堆积问题的方法还是增加消费者实例,不过 同时你还需要增加每个主题的队列数量

如何实现定时消息? ⭐TODO

Kafka

Kafka 是一个分布式流式处理平台。

流平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。

  2. 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。

  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 主要有两大应用场景:

  1. 消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。

  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

Kafka的优势:吞吐量大,每秒可以处理千万级别的消息;生态系统兼容性好。

Kafka的消息模型

RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)

kafka的架构图

image-20240623204606259

Kafka 比较重要的几个概念:

  1. Producer(生产者) : 产生消息的一方。

  2. Consumer(消费者) : 消费消息的一方。

  3. Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。

同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。

  • Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。

多副本机制

Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?

Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。

Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。

ZooKeeper和Kafka

ZooKeeper 主要为 Kafka 提供元数据的管理的功能。

Zookeeper 主要为 Kafka 做了下面这些事情:

  1. Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去

  2. Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0/brokers/topics/my-topic/Partitions/1

  3. 负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 **对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。**当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

顺序消费

我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。

每次添加消息到 Partition(分区) 的时候都会采用尾加法。 **Kafka 只能为我们保证 Partition(分区) 中的消息有序。**消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

如何确保消息不丢失?

生产者丢失消息

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。

所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}

但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(
result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage())
);

如果消息发送失败的话,我们检查失败的原因之后重新发送即可

消费者丢失消息

我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。这时候就要对消费者进行幂等性设计了。

Kafka弄丢了消息

我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

  • 设置 acks = all

    acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高.

  • 设置 replication.factor >= 3

    为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

  • 设置 min.insync.replicas > 1

    一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

  • 设置 unclean.leader.election.enable = false

    我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

如何确保不重复消费?

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。

  • Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  • 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。

  • enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:

    什么时候提交 offset 合适?

    • 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样

    • 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。

kafka重试机制

Kafka消费者消费失败的时候会触发重试机制。

在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。

默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?

Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。

自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
// 自定义重试时间间隔以及次数
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));
factory.setConsumerFactory(consumerFactory);
return factory;
}

自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandlerhandleRemaining 函数,加上自定义的告警等操作。

@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {

public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}

@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}

DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等

重试失败后的数据如何再次处理

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。

@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。

// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}

RocketMQ和Kafka的选择

RockerMQ

  • 优点

    1. 支持多种消费方式(consumer pull, broker push)
    2. broker可以进行消息过滤
    3. 支持消息顺序消费
    4. consumer可水平扩展,消费能力很强
    5. 支持事务
  • 缺点

    1. 相比于kafka,使用者较少,生态不够完善。消息堆积、吞吐率上也有所不如。
    2. 不支持主从自动切换,master失效后,消费者需要一定的时间才能感知。
    3. 客户端只支持Java

Kafka:

  • 优点

    1. 高吞吐、低延迟、高可用、集群热扩展、集群容错
    2. producer端提供缓存、压缩功能,可节省性能,提高效率。
    3. 提供顺序消费能力
    4. 生态完善,在大数据处理方面有大量配套的设施。
  • 缺点

    1. 消息容易丢失,适合做日志系统,不适合做数据系统。
    2. 消费集群数目受到分区数目的限制。
    3. 单机topic多时,性能会明显降低。单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
    4. 不支持事务
    5. 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
    6. 消费失败不支持重试