RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

消息队列

什么是消息队列

消息队列(Message Queue):简称MQ,顾名思义就是一个传递消息且符合队列原则(先进先出)的中间件;

一般有两个角色:生产者(负责生产并发送消息),消费者(负责接收并消费消息);

为什么要使用消息队列

消息队列一般有三种使用场景

解耦

解耦就是解除系统的耦合度,比如现在有一个购买场景,需要减库存扣款生成订单,三项操作。
不使用消息队列操作的话就需要依次调用三项操作,这样的话,购买这边就与三个系统关联耦合起来了。
使用消息队列,购买只需要发送三个消息,然后消费者接受消息并进行业务逻辑操作,这样对于购买来说,只需要发送指令,哪个项目进行业务处理不需要关心,如何处理也不需要关心,这样就解除了耦合度。

异步

还是上面的购买场景,假如现在扣款操作耗时很久,就可以使用消息队列异步操作,购买接口的耗时也就只是发送消息的耗时,实际业务处理是异步处理。

削峰/限流

比如现在有抢购场景,同一个接口有数十万用户调用,服务器压力过大,直接崩溃。
可以使用消息队列进行削峰,消息队列中的数据会依次被消费,且不会突然给出量大到服务器无法处理的消息。让服务可以慢慢消费,牺牲时间,可以保证服务器可用性。

队列模型与主题模型

队列模型

也叫点对点式消息队列,客户端分为生产者和消费者,队列中的消息只能被一个消费者消费,也可以说一个生产者对一个消费者。

主题模型

也叫订阅式消息队列,类似设计模式中的观察者模式,有兴趣的可以查看Java设计模式篇
在主题模型中,消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber) ,存放消息的容器称为 主题(Topic)
其中,发布者将消息发送到指定主题中,订阅者需要 提前订阅主题 才能接受特定主题的消息。

消息队列会产生什么问题

没有哪一门技术是“银弹”,消息队列也有它的副作用。

  1. 原本直接调用的代码现在添加一个消息中间件,无疑增加了系统的复杂度,给开发维护造成了困扰。
  2. 可用性,如果消息队列挂了,那相应的系统也无法正常运行。
  3. 可靠性
    1. 如果在消息发送过程中,消息队列挂了,是否会丢失消息。
    2. 消息重复消费(幂等性)
    3. 分布式事务问题(多消费者中任意一个出现错误则需要全部回滚)
    4. 如果消费者过于缓慢,生产者消息又多,这样又会造成消息堆积

RocketMQ

RocketMQ是什么

RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。

RocketMQ架构组成

架构图

RocketMQ

架构组成部分

NameServer集群

主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。

  • NameServer是一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交互。
  • NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。
  • 但有一点需要注意,Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败。
  • NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。
  • 每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

Producer

消息生产者,负责生产消息,并将消息发送给Broker集群,一般由业务系统作为生产者。

  • Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
  • RocketMQ 提供了三种方式发送消息:同步、异步和单向
  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

Broker

消息中转角色,负责存储消息,转发消息。

  • Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
  • Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
  • 官网上有数据显示:具有上亿级消息堆积能力,同时可严格保证消息的有序性

Consumer

消息消费者,负责消费消息(实际业务处理),一般由后台系统作为消费者。

  • Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费广播消息,提供实时的消息订阅机制
  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

消息领域模型

Message

Message(消息)就是要传输的信息。
一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。
一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

Topic

Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。
Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。

Tag

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有Tag
标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

Group

Group(分组),一个分组可以订阅多个Topic。
分为ProducerGroup(生产者组)ConsumerGroup(消费者组),比如多个秒杀消息发送者集群部署,那么这些服务合起来就是一个 秒杀生产者组。

Queue

在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。

Message Queue

Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。
一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。
消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

Offset

在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

消息顺序

消息顺序分为两种:Orderly(顺序消费),按照先进先出依次消费。Concurrently(并行消费),无顺序并行消费,并行数量受每个消费者指定的线程池数量限制。

完整通讯流程

  1. 生产者从NameServer集群中随机选择一个节点建立长链接。
  2. 定期从NameServer中获取Broker Master信息并建立长连接,并定期向Broker发送心跳。
  3. 生产者发送一条消息到Broker Master
  4. 消费者从Broker集群中获取到消息
  5. 消费消息

常见问题

优缺点

优点

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到0丢失
  • 功能支持:MQ功能较为完善,还是分布式的,扩展性好
  • 支持10亿级别的消息堆积,不会因为堆积导致性能下降
  • 天生为互联网金融行业而生,历经多次双十一磨炼。可靠性高

缺点

  • 客户端支持语言不多,目前只是Java和C++ 。
  • 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

消息去重

原则:使用业务端逻辑判断保持幂等性问题。

幂等性:就是说同参数下消费端无论消费多少次都和消费一次结果是一样的。

去重策略:每一条消息都有唯一的编号(唯一流水号),且保证消息处理结束后会记录这个唯一流水号

消息的可用性

当我们选择好了集群模式之后,那么我们需要关心的就是怎么去存储和复制这个数据,RocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。

除了存储有选择之后,我们的主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右。

RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

Kafka采用的是独立型的存储结构,每个队列一个文件。

RocketMQ 刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。

异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。

分布式事务

Half Message(半消息)

是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer

对消息的二次确认后,Consumer才能去消费它。

消息回查

由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会

主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查

  1. A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
  2. 当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
  3. 执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
  4. 如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
  5. 如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
  6. 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。

例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:

  • 消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
  • 消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
  • 评估消息堆积能力主要有以下四点:
  • 消息能堆积多少条,多少字节?即消息的堆积容量。
  • 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
  • 消息堆积后,正常消费的Consumer是否会受影响?
  • 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?

定时消息

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。

如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。