封面来源:本文封面来源于网络,如有侵权,请联系删除。

本文参考:【尚硅谷】2021最新版RocketMQ教程丨轻松入门分布式消息系统

1. MQ 概述

1.1 MQ 的简介

MQ,Message Queue,是一种提供 消息队列服务 的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API 的软件系统。消息即数据(消息流、数据流),一般来说,消息的体量不会很大。

1.2 MQ 的用途

对于 MQ 的用途来说,总结起来就是以下三点:

1.2.1 限流削锋

MQ 可以暂存系统的超量请求,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

MQ限流削峰-1

MQ限流削峰-2

1.2.2 异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。使用异步调用可以解决这些问题。如果两层之间想要实现由同步到异步的转化,一般的做法就是在这两层之间添加一个 MQ 层。

MQ异步解耦

1.2.3 数据收集

分布式系统会产生海量的数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,是当前互联网平台的必备技术,而通过 MQ 完成此类数据收集是最好的选择。

1.3 常见 MQ 产品

ActiveMQ

ActiveMQ 是使用 Java 语言开发的一款 MQ 产品,早期很多公司与项目中都在使用,但由于现在的社区活跃度很低,在项目中已经很少使用了。

RabbitMQ

RabbitMQ 是使用 ErLang 语言开发的一款 MQ 产品。其吞吐量较 Kafka 与 RocketMQ 而言要低,且由于未使用 Java 语言开发,所以公司内部对其实现定制化开发难度较大。

Kafka

Kafka 是使用 Scala / Java 语言开发的一款 MQ 产品。其最大的特点就是高吞吐率,常用于大数据领域的实时计算、日志采集等场景。它没有遵循任何常见的 MQ 协议,而是采用自研协议。Spring Cloud Netflix 仅支持 RabbitMQ 与 Kafka。

RocketMQ

RocketMQ 是使用 Java 语言开发的一款 MQ 产品。经过数年双 11 的考验,性能与稳定性非常高。它也没有遵循任何常见的 MQ 协议,采用自研协议。对于 Spring Cloud Alibaba,其支持RabbitMQ、Kafka,但更提倡使用 RocketMQ。

MQ 之间的对比

关键词 ActiveMQ RabbitMQ Kafka RocketMQ
开发语言 Java ErLang Java Java
单机吞吐量 万级 万级 十万级 十万级
Topic - - 百级 Topic 时会影响系统吞吐量 千级 Topic 时会影响系统吞吐量
社区活跃度

Kafka、RabbitMQ 和 RocketMQ 的详细对比:

三大消息中间件的详细对比

1.4 MQ 常见协议

一般情况下,MQ 的实现是要遵循一些常规性协议的(RocketMQ 都不支持 😂),比如:

1.4.1 JMS

JMS,Java Messaging Service(Java消息服务),是 Java 平台上有关 MOM(Message Oriented Middleware,面向消息的中间件)的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。ActiveMQ 是该协议的典型实现。

1.4.2 STOMP

STOMP,Streaming Text Orientated Message Protocol(面向流文本的消息协议),是一种 MOM 设计的简单文本协议。STOMP 提供一个可互操作的连接格式,允许客户端与任意 STOMP 消息代理(Broker)进行交互。ActiveMQ 是该协议的典型实现,RabbitMQ 通过插件可以支持该协议。

1.4.3 AMQP

AMQP,Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种 MOM 设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品、不同开发语言等条件的限制。 RabbitMQ 是该协议的典型实现。

1.4.4 MQTT

MQTT,Message Queuing Telemetry Transport(消息队列遥测传输),是 IBM 开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗 IoT(物联网)设备间的通信。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。 RabbitMQ 通过插件可以支持该协议。

2. RocketMQ 概述

2.1 RocketMQ 简介

RocketMQ官网图片

PS:上述图片信息截止 2021-7-21。

RocketMQ 是一个统一消息引擎、轻量级数据处理平台。

RocketMQ 是⼀款阿⾥巴巴开源的消息中间件。2016 年 11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ 孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。

官⽹地址:Apache RocketMQ

2.2 RocketMQ 发展历程

RocketMQ演进历史

2007 年,阿里开始五彩石项目,Notify 作为项目中 交易核心消息流转系统 应运而生。Notify 系统就是 RocketMQ 的雏形。

2010 年,B2B 大规模使用 ActiveMQ 作为阿里的消息内核。阿里急需一个具有海量堆积能力的消息系统。

2011 年初,Kafka 开源。淘宝中间件团队在对 Kafka 进行了深入研究后,开发了一款新的MQ,MetaQ。

2012 年,MetaQ 发展到了 v3.0 版本,在它基础上进行了进一步的抽象,形成了 RocketMQ,然后就将其进行了开源。

2015 年,阿里在 RocketMQ 的基础上,又推出了一款专门针对阿里云上用户的消息系统 Aliware MQ。

2016 年双十一,RocketMQ 承载了万亿级消息的流转,跨越了一个新的里程碑。11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。

2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。

3. RocketMQ 的安装与启动

3.1 基本概念

3.1.1 消息

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

3.1.2 主题

RocketMQ的Topic

Topic 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。一个 Topic 可以有多个 Message,而一个 Message 只属于一个 Topic。

一个生产者可以同时发送多种 Topic 的消息,而一个消费者只对某种特定的 Topic 感兴趣,即只可以订阅和消费一种 Topic 的息。

简单来说,可以通过 Topic 对不同的业务消息进行分类。

3.1.3 标签

为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Topic 是消息的一级分类,Tag 是消息的二级分类。

Topic 与 Tag 的关系图示:

RocketMQ中Topic和Tag的关系

那什么时候使用 Topic,什么时候使用 Tag 呢?[1]

1、消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。

2、业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。

3、消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。

4、消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

3.1.4 队列

存储消息的物理实体。一个 Topic 中可以包含多个 Queue,每个 Queue 中存放的就是该 Topic 的消息。一个 Topic 的 Queue 也被称为一个 Topic 中消息的分区(Partition)。

一个 Topic 的 Queue 中的消息只能被一个消费者组中的一个消费者消费。一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

将消息分区、分 Queue 存放,可以提升 RocketMQ 的整体读写效率。

RocketMQ的Queue

在学习参考其它相关资料时,还会看到一个概念:分片(Sharding)。分片不同于分区。在 RocketMQ 中,分片指的是存放相应 Topic 的 Broker。每个分片中会创建出相应数量的分区,即 Queue,每个 Queue 的大小都是相同的。

RocketMQ的Broker

3.1.5 消息标识

RocketMQ 中每个消息拥有“唯一的”(可能会重复) MessageId,且可以携带具有业务标识的 Key,以方便对消息的查询。

不过需要注意的是,MessageId 有两个:在生产者 send() 消息时会自动生成一个 MessageId(msgId),当消息到达 Broker 后,Broker 也会自动生成一个 MessageId(offsetMsgId)。msgId、offsetMsgId 与 key 都称为消息标识。

1、msgId:由 Producer(生产者)端生成,其生成规则为:producerIp + 进程 pid + MessageClientIDSetter 类的 ClassLoader 的 hashCode + 当前时间 + AutomicInteger 自增计数器。也有一定的几率会重复。

2、offsetMsgId:由 Broker 端生成,其生成规则为:brokerIp + 物理分区的 offset(Queue 中的偏移量)。重复的可能性很大。

3、key:由用户指定的业务相关的唯一标识。由用户控制,可以保证唯一性。

3.2 系统架构

RocketMQ的系统架构

RocketMQ 在架构上主要分成 4 个部分:

3.2.1 Producer

Producer(消息生产者) 负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

例如,业务系统产生的日志写入到 MQ 的过程,就是消息生产的过程;电商平台中用户提交的秒杀请求写入到 MQ 的过程,也是消息生产的过程。

RocketMQ 中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类 Producer 发送相同 Topic 类型的消息。一个生产者组可以同时发送多个主题的消息。比方说,现有一个消费者组,其中有四个消费者,1 号消费者可以生产 A、B、C、D 四种主题的消息,2 号与 3 号消费者也可以生产这四种,但 4 号消费者就只能生产 A、B、C 三种主题的消息。

3.2.2 Consumer

Consumer(消息消费者)负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理。

例如,QoS 系统从 MQ 中读取日志,并对日志进行解析处理的过程就是消息消费的过程;电商平台的业务系统从 MQ 中读取到秒杀请求,并对请求进行处理的过程也是消息消费的过程。

RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类 Consumer 消费的是同一个 Topic 类型的消息。消费者组使得在消息消费方面,实现负载均衡(将一个 Topic 中的不同的 Queue 平均分配给同一个 Consumer Group 的不同的 Consumer,这并不是将消息负载均衡,而是将 Queue 负载均衡,与 Kafka 一样)和容错(一个 Consmer 挂了,该 Consumer Group 中的其它 Consumer 可以接着消费原 Consumer 消费的 Queue)的目标变得非常容易。

RocketMQ的Consumer-1

消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量。 如果超出 Queue 数量,那么多出的 Consumer 不能消费消息(一个 Queue 只允许一个消费者消费)。

RocketMQ的Consumer-2

不过,一个 Topic 类型的消息可以被多个消费者组同时消费。

注意:

1、消费者组只能消费一个 Topic 的消息,不能同时消费多个 Topic 消息;

2、一个消费者组中的消费者必须订阅 完全相同 的 Topic。

3.2.3 NameServer

功能介绍

NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现。

RocketMQ 的思想来自于 Kafka,而 Kafka 是依赖了 Zookeeper 的,所以在 RocketMQ 的早期版本也是依赖于Zookeeper的。从 MetaQ v3.0,即 RocketMQ 开始去掉了 Zookeeper 依赖,转而使用了自己的 NameServer。

NameServer 主要包括两个功能:

1、Broker 管理。接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查 Broker 是否还存活。

2、路由信息管理。每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Conumser 通过 NameServer 可以获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。

路由注册

NameServer 通常也是以集群的方式部署,不过 NameServer 是无状态的,即 NameServer 集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。

那各节点中的数据是如何进行数据同步的呢?

在 Broker 节点启动时,轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在 NameServer 内部维护着⼀个 Broker 列表,用来动态存储 Broker 的信息。这是与其它像 ZooKeeper、Eureka、Nacos 等注册中心不同的地方。

NameServer 的这种无状态方式,有什么优缺点:

优点:NameServer 集群搭建简单,扩容简单。

缺点:对于 Broker,必须明确指出所有 NameServer 地址,而未指出的将不会去注册。正因如此,NameServer 并不能随便扩容。若 Broker 不重新配置,新增的 NameServer 对于 Broker 来说是不可见的,其不会向这个 NameServer 进行注册。

Broker 节点为了证明自己是活着的,为了维护与 NameServer 间的长连接,会将最新的信息以心跳包的方式上报给 NameServer,每 30 秒发送一次心跳。心跳包中包含 BrokerId、Broker 地址( IP + Port)、Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新心跳时间戳,记录这个 Broker 的最新存活时间。

路由剔除

由于 Broker 关机、宕机或网络抖动等原因,NameServer 没有收到 Broker 的心跳,NameServer 可能会将其从 Broker 列表中剔除。

NameServer 中有⼀个定时任务,每隔 10 秒就会扫描⼀次 Broker 表,查看每一个 Broker 的最新心跳时间戳距离当前时间是否超过 120 秒,如果超过,则会判定 Broker 失效,然后将其从 Broker 列表中剔除。

扩展:对于 RocketMQ 日常运维工作,例如 Broker 升级,需要停掉 Broker 的工作。OP 需要怎么做?

OP 需要将 Broker 的读写权限禁掉。一旦 Client(Consumer 或 Producer)向 Broker 发送请求,都会收到 Broker 的 NO_PERMISSION 响应,然后 Client 会进行对其它 Broker 的重试。当 OP 观察到这个 Broker 没有流量后,再关闭它,实现 Broker 从 NameServer 的移除。

名词解析:OP,Operations,即:运维工程师;SRE,Site Reliability Engineer,现场可靠性工程师。

路由发现

RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每 30 秒会拉取一次最新的路由。

扩展:

1、Push 模型:推送模型。其实时性较好,是一个“发布-订阅”模型,需要维护一个长连接, 而长连接的维护是需要资源成本的。该模型适合于的场景:① 实时性要求较高;② Client 数量不多,Server 数据变化较频繁。

2、Pull 模型:拉取模型。存在的问题是,实时性较差。

3、Long Polling 模型:长轮询模型。其是对 Push 与 Pull 模型的整合,充分利用了这两种模型的优势,屏蔽了它们的劣势。

客户端 NameServer 选择策略

PS:这里的客户端指的是 Producer 与 Consumer。

客户端在配置时必须要写上 NameServer 集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?

客户端首先会生产一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用 round-robin 策略,逐个尝试着去连接其它节点。也就是说, 首先采用随机策略进行的选择,失败后再采用是轮询策略。

扩展:ZooKeeper Client 是如何选择 ZooKeeper Server 的?

简单来说就是,经过两次 Shuffle,然后选择第一台 ZooKeeper Server。详细说就是,将配置文件中的 ZK Server地址进行第一次 Shuffle,然后随机选择一个,这个选择出的一般都是一个 hostname。然后获取到该 hostname 对应的所有 IP,再对这些 IP 进行第二次 Shuffle,从 Shuffle 过的结果中取第一个 Server 地址进行连接。

3.2.4 Broker

功能介绍

Broker 充当着消息中转角色,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker 同时也存储着消息相关的元数据,包括消费者组消费进度偏移 offset、主题、队列等。

PS:Kafka 0.8 版本之后,offset 是存放在 Broker 中的,而之前版本是存放在 ZooKeeper 中的。

模块构成

RocketMQ的Broker模块构成

Remoting Module:整个 Broker 的实体,负责处理来自 Clients 端的请求。这个 Broker 实体又由以下模块构成:

1、Client Manager:客户端管理器。负责接收、解析客户端(Producer / Consumer)请求,管理客户端。例如,维护 Consumer 的 Topic 订阅信息。

2、Store Service:存储服务。提供方便简单的 API 接口,处理消息存储到物理硬盘和消息查询功能。

3、HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。

4、Index Service:索引服务。根据特定的 Message key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。

集群部署

RocketMQ的系统架构

为了增强 Broker 性能与吞吐量,Broker 一般都是以集群形式出现的。各集群节点中可能存放着相同 Topic 的不同 Queue。

不过,这里有个问题,如果某 Broker 节点宕机,如何保证数据不丢失呢?

其解决方案是:将每个 Broker 集群节点进行横向扩展,即将 Broker 节点再建为一个 HA 集群(高可用集群),解决单点问题。

Broker 节点集群是一个主从集群,即集群中具有 Master 与 Slave 两种角色。Master 负责处理读写操作请求,Slave 负责对 Master 中的数据进行备份。当 Master 挂掉了,Slave 则会自动切换为 Master 去工作,所以这个 Broker 集群是主备集群。一个 Master 可以包含多个 Slave,但一个 Slave 只能隶属于一个 Master。

Master 与 Slave 的对应关系是通过指定相同的 BrokerName、不同的 BrokerId 来确定的。BrokerId 为 0 表示 Master,非 0 表示 Slave。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

3.2.5 工作流程

具体流程

1、启动 NameServer,NameServer 启动后开始监听端口,等待 Broker、Producer、Consumer 连接;

2、启动 Broker 时,Broker 会与所有的 NameServer 建立并保持长连接,然后每 30 秒向 NameServer 定时发送心跳包。

3、发送消息前,可以先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,当然,在创建 Topic 时也会将 Topic 与 Broker 的关系写入到 NameServer 中。不过,这步是可选的,也可以在发送消息时自动创建 Topic。

4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取路由信息,即当前发送的 Topic 消息的 Queue 与 Broker 的地址(IP + Port)的映射关系。然后根据算法策略从队列选择一个 Queue,与队列所在的 Broker 建立长连接从而向 Broker 发消息。当然,在获取到路由信息后,Producer 会首先将路由信息缓存到本地,再每 30 秒从 NameServer 更新一次路由信息。

5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取其所订阅 Topic 的路由信息,然后根据算法策略从路由信息中获取到其所要消费的 Queue,然后直接跟 Broker 建立长连接,开始消费其中的消息。Consumer 在获取到路由信息后,同样也会每 30 秒从 NameServer 更新一次路由信息。不同于 Producer 的是,Consumer 还会向 Broker 发送心跳,以确保 Broker 的存活状态。

Topic 的创建模式

手动创建 Topic 时,有两种模式:

1、集群模式。该模式下创建的 Topic 在该集群中,所有 Broker 中的 Queue 数量是相同的。

2、Broker 模式。该模式下创建的 Topic 在该集群中,每个 Broker 中的 Queue 数量可以不同。

自动创建 Topic 时,默认采用的是 Broker 模式,会为每个 Broker 默认创建 4 个 Queue。

读 / 写队列

从物理上来讲,读 / 写队列是同一个队列。所以,不存在读 / 写队列数据同步问题。读 / 写队列是逻辑上进行区分的概念。一般情况下,读 / 写队列数量是相同的。

例如,创建 Topic 时设置的写队列数量为 8,读队列数量为 4,此时系统会创建 8 个 Queue,分别是 0 1 2 3 4 5 6 7。Producer 会将消息写入到这 8 个队列,但 Consumer 只会消费 0 1 2 3 这 4 个队列中的消息,4 5 6 7中的消息是不会被消费到的。

再如,创建 Topic 时设置的写队列数量为 4,读队列数量为 8,此时系统会创建 8 个 Queue,分别是 0 1 2 3 4 5 6 7。Producer 会将消息写入到 0 1 2 3 这 4 个队列,但 Consumer 只会消费 0 1 2 3 4 5 6 7 这 8 个队列中的消息,但是 4 5 6 7 中是没有消息的。此时假设 Consumer Group 中包含两个 Consuer,Consumer1 消费 0 1 2 3,而 Consumer2 消费 4 5 6 7。但实际情况是,Consumer2 是没有消息可消费的。

也就是说,当读 / 写队列数量设置不同时,总是有问题的。那么,为什么要这样设计呢?

其这样设计是为了方便 Topic 的 Queue 的缩容。

例如,原来创建的 Topic 中包含 16 个 Queue,如何能够使其 Queue 缩容为 8 个,还不会丢失消息?

可以动态修改写队列数量为 8,读队列数量不变。此时新的消息只能写入到前 8 个队列,而消费都消费的却是 16 个队列中的数据。当发现后 8 个 Queue 中的消息消费完毕后,就可以再将读队列数量动态设置为8。整个缩容过程,没有丢失任何消息。

perm 用于设置对当前创建 Topic 的操作权限:2 表示只写,4 表示只读,6 表示读写。

3.3 单机安装与启动

PS: 本次安装将安装在阿里云 ECS 上。

3.3.1 准备工作

软硬件需求

安装RocketMQ的软硬件需求

根据官方文档的描述[2],安装的软硬件需求如下:

1、64 位 Linux、Unix、Mac 操作系统;

2、64 位 JDK 1.8 及其以上;

3、Maven 版本 3.2.x;

4、Git;

5、4G 及其以上的磁盘存储空间;

注意:RocketMQ 的启动需要 JDK 与 Maven 的支持,因此一定要先在 Linux 上安装 Maven 和 JDK。

下载安装包

进入 RocketMQ 官网,点击首页的 RocketMQ 版本:

点击RocketMQ当前版本

然后点击下载 RocketMQ 的二进制文件:

下载RocketMQ的二进制文件

点击镜像源进行下载:

使用清华镜像源下载RocketMQ

接下来切换到 Linux 环境,创建文件夹 rocketmq:

1
mkdir /opt/rocketmq

使用 Xftp 将下载好的 RocketMQ 二进制文件上传到 Linux 环境下的 /opt/rocketmq 目录下。

进入 /opt/rocketmq 目录:

1
cd /opt/rocketmq

接下来需要对压缩文件进行解压,但由于这个压缩文件是 zip 格式的,因此需要先下载 unzip 解压工具:

1
yum install -y unzip zip

安装好之后再进行解压:

1
unzip rocketmq-all-4.9.0-bin-release.zip

在 /usr/local 目录下创建文件夹 rocketmq:

1
mkdir /usr/local/rocketmq

将刚刚解压后的文件移动到这个文件夹下:

1
mv /opt/rocketmq/rocketmq-all-4.9.0-bin-release /usr/local/rocketmq

3.3.2 修改初始内存

进入 /usr/local/rocketmq/rocketmq-all-4.9.0-bin-release 目录下:

1
cd /usr/local/rocketmq/rocketmq-all-4.9.0-bin-release

然后再进入 bin 目录:

1
cd bin

修改 runserver.sh

在 bin 目录下输入以下命令:

1
vim runserver.sh

进入该文件后,可以看到初始内存非常大:

runserver.sh修改前

改小一点,比如:

runserver.sh修改后

修改完成后,键入 ESC,输入 :wq 保存并退出。

修改 runbroker.sh

修改完 runserver.sh 并成功退出后,执行以下命令,修改 runbroker.sh:

1
vim runbroker.sh

进入该文件后,可以看到初始内存非常大:

runbroker.sh修改前

改小一点,比如:

runbroker.sh修改后

修改完成后,键入 ESC,输入 :wq 保存并退出。

3.3.3 启动

启动 Name Server

得先切换到 RocketMQ 的安装目录下

1
cd /usr/local/rocketmq/rocketmq-all-4.9.0-bin-release

运行以下命令启动 Name Server:

1
nohup sh bin/mqnamesrv &

输入上述命令后并没有什么反应,终端显示:

nohup: ignoring input and appending output to ‘nohup.out’

键入回车即可。

那我们怎么知道到底有没有启动成功呢?

可以输入以下命令查看日志:

1
tail -f ~/logs/rocketmqlogs/namesrv.log

如果日志信息中出现 The Name Server boot success. 的字样就表示启动成功。

日志查看完毕后,键入 Ctrl + c 退出。

除此之外,可以输入 jps 命令查看进程,可以看到进程中存在名为 NamesrvStartup 的进程。

启动 Broker

同样得先切换到 RocketMQ 的安装目录下

1
cd /usr/local/rocketmq/rocketmq-all-4.9.0-bin-release

运行以下命令启动 Broker:

1
nohup sh bin/mqbroker -n localhost:9876 &

注意: 这里的 localhost 应当换成你 Linux 的 IP 地址(公网 IP),同时需要 Linux 服务器暴露 9876、10909、10911 和 10912 四个端口(如果是虚拟机,执行命令 systemctl stop firewalld 关闭防火墙)。

输入上述命令后并没有什么反应,终端显示:

nohup: ignoring input and appending output to ‘nohup.out’

键入回车即可。

那我们怎么知道到底有没有启动成功呢?

可以输入以下命令查看日志:

1
tail -f ~/logs/rocketmqlogs/broker.log

如果日志信息中出现 The broker[xxx] boot success. 的字样就表示启动成功。

日志查看完毕后,键入 Ctrl + c 退出。

除此之外,可以输入 jps 命令查看进程,可以看到进程中存在名为 BrokerStartup 的进程。

3.3.4 发送/接收消息测试

以下命令的执行均在 RocketMQ 的安装目录下执行。

首先执行以下命令:

1
export NAMESRV_ADDR=localhost:9876

注意: 这里的 localhost 应当换成你 Linux 的 IP 地址(公网 IP)。

利用官方提供的 demo 进行测试,执行以下命令发送消息:

1
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

生产了消息还需要消费消息,同样使用官方提供的 demo:

1
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

利用上述几条命令也可以测试先前安全的 RocketMQ 是否正确。

3.3.5 关闭

同样是在 RocketMQ 的安装目录下执行以下命令,先关闭 Broker,再关闭 Name Server。

关闭 Broker:

1
sh bin/mqshutdown broker

关闭 Name Server:

1
sh bin/mqshutdown namesrv

3.4 控制台的安装与启动

3.4.1 基本步骤

RocketMQ 有一个可视化的 dashboard,通过该控制台可以直观的查看到很多数据。

下载地址:rocketmq-externals

rocketmq-externals

下载完成之后将其解压,解压完成之后可以看到这是一个 SpringBoot 工程,因此我们修改一下它的配置文件。

RocketMQ-Console的配置文件

打开这个配置文件后我们可以看到这两个配置:

1
2
server.port=8080
rocketmq.config.namesrvAddr=localhost:9876

8080 端口是经常使用的,为了防止端口占用,我们重新设置一下,除此之外还需要指定 Name Server 的地址(就是启动 Broker 时输入的 localhost:9876,当然你需要换成你自己的 IP 与端口)。

配置修改完成后,还需要在 rocketmq-console 的 pom.xml 中增加 JAXB 依赖(JAXB,Java Architechture for Xml Binding,用于XML绑定的Java技术,是一项可以根据 XML Schema 生成 Java 类的技术,也是一个业界标准)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>

添加完依赖后,在 rocketmq-console 所在的目录处打开 CMD,对项目进行打包。执行一下命令:

1
mvn clean package -Dmaven.test.skip=true

运行命令后,静静等待打包成功(注意,当前电脑上应当配置 Maven 环境)。如果出现 Build Success 的字样就表示打包成功。

打包完成之后,会生成 target 目录,在目录中有一个 jar 包,使用 Java 命令运行它:

1
java -jar rocketmq-console-ng-1.0.0.jar

启动完成之后,在浏览器中输入 localhost:7000 进行访问即可。

注意:访问时需要先启动 RocketMQ,再运行 jar 包。

如果无法正常访问,或访问后报错,前往下方【遇到的问题】一栏中查看是否是同一错误。

由于此时 RocketMQ 中并没有消息,因此可以先使用官方提供的 demo 生产一些消息:

1
2
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

注意: 这里的 localhost 应当换成你 Linux 的 IP 地址(公网 IP)。

生产了消息之后,可以前往“消息”页查看消息:

在RocketMQ控制台查看消息

3.4.2 遇到的问题

访问 lcoalhost:7000 后,出现以下错误:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <xxxx> failed

前往 RocketMQ 的安装目录下,进入 conf 目录:

1
cd conf

修改其中的 broker.conf 文件:

1
vim broker.conf

添加或修改以下内容:

brokerIP1 = xx.xxx.xxx.xx
namesrvAddr = xx.xxx.xxx.xx:9876

其中的 xx.xxx.xxx.xx 表示 Linux 服务器的 IP 地址(公网 IP)。

修改并保存后,关闭 Broker 服务,使用以下命令重启:

1
nohup sh bin/mqbroker -n xx.xxx.xxx.xx:9876 -c conf/broker.conf  >  bin/startMqBroker.log 2>&1 &

一定要追加 -c conf/broker.conf 表示用 broker.conf 的配置信息,否则是不行的。

成功启动 Broker 后,再运行 jar 包,访问 localhost:7000

3.5 集群搭建理论

RocketMQ集群架构图

PS:这里说的集群是指 Broker 集群。

3.5.1 数据复制与刷盘策略

RocketMQ复制与刷盘的区别

复制策略

复制策略是 Broker 的 Master 与 Slave 间的数据同步方式。分为同步复制与异步复制:

同步复制:消息写入 Master 后,Master 会等待 Slave 同步数据成功后才向 Producer 返回成功 ACK(确认字符,回执)。

异步复制:消息写入 Master 后,Master 立即向 Producer 返回成功 ACK,无需等待 Slave 同步数据成功。

异步复制策略会降低系统的写入延迟,RT(Reaction Time,响应时间)变小,提高了系统的吞吐量。

刷盘策略

刷盘策略指的是 Broker 中消息的 落盘 方式,即消息发送到 Broker 内存后消息持久化到磁盘的方式,同样分为同步与异步两种:

同步刷盘:当消息持久化到 Broker 的磁盘后才算是消息写入成功。

异步刷盘:当消息写入到 Broker 的内存后即表示消息写入成功,无需等待消息持久化到磁盘。

注意:

1、异步刷盘策略会降低系统的写入延迟,RT 变小,提高了系统的吞吐量;

2、消息写入到 Broker 的内存,一般是写入到了 PageCache;

3、对于异步刷盘策略,消息会写入到 PageCache 后立即返回成功 ACK,但并不会立即做落盘操作,而是当 PageCache 到达一定量时会自动进行落盘。

3.5.2 Broker集群模式

根据 Broker 集群中各个节点间关系的不同,Broker 集群可以分为以下几类:

单 Master

只有一个 Broker(这都其本质上就不能称为集群)。这种方式也只能是在测试时使用,生产环境下不能使用,因为存在单点问题。

多 Master

Broker 集群仅由多个 Master 构成,不存在 Slave。同一 Topic 的各个 Queue 会平均分布在各个 Master 节点上。

优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘则不会丢失),性能最高。

缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅(不可消费),消息实时性会受到影响。

注意: 以上优点的前提是,这些 Master 都配置了 RAID 磁盘阵列。如果没有配置,一旦出现某 Master 宕机,则会发生大量消息丢失的情况。

多 Master 多 Slave 模式 — 异步复制

Broker 集群由多个 Master 构成,每个 Master 又配置了多个 Slave(在配置了 RAID 磁盘阵列的情况下,一个 Master 一般配置一个 Slave 即可)。Master 与 Slave 的关系是主备关系,即 Master 负责处理消息的读写请求,而 Slave 仅负责消息的备份与 Master 宕机后的角色切换。

异步复制即前面所讲的 复制策略 中的 异步复制策略,即消息写入 Master 成功后,Master 立即向 Producer 返回成功 ACK,无需等待 Slave 同步数据成功。

该模式的最大特点是当 Master 宕机后 Slave 能够 自动切换 为 Master。不过由于 Slave 从 Master 的同步具有短暂的延迟(毫秒级),所以当 Master 宕机后,这种异步复制方式可能会存在少量消息的丢失问题,但 Slave 从 Master 同步的延迟越短,其可能丢失的消息就越少。

对于 Master 的 RAID 磁盘阵列,若使用的也是异步复制策略,同样也存在延迟问题,也可能会丢失消息,但 RAID 阵列的延迟是微秒级的(这是由硬件支持的),所以其丢失的数据量会更少。

多 Master 多 Slave 模式 — 同步双写

该模式是多 Master 多 Slave 模式的 同步复制 实现。所谓 同步双写,指的是消息写入 Master 成功后,Master 会等待 Slave 同步数据成功后才向 Producer 返回成功 ACK,即 Master 与 Slave 都要写入成功后才会返回成功 ACK,也即双写。

该模式与 异步复制模式 相比,优点是消息的安全性更高,不存在消息丢失的情况,但单个消息的 RT 略高,从而导致性能要略低(大约低 10%)。

该模式存在一个大的问题:对于目前的版本,Master 宕机后,Slave 不会自动切换到 Master。

最佳实践

一般会为 Master 配置 RAID10 磁盘阵列,然后再为其配置一个 Slave。这样做既利用了 RAID10 磁盘阵列的高效、安全性,又解决了可能会影响订阅的问题。

RAID 磁盘阵列的效率要高于 Master-Slave 集群。因为 RAID 是硬件支持的,也正因如此,所以 RAID 阵列的搭建成本较高。

多 Master + RAID 阵列,与多 Master 多 Slave 集群的区别是什么?

1、多 Master + RAID 阵列,其仅仅可以保证数据不丢失,即不影响消息写入,但其可能会影响到消息的订阅,但其执行效率要远高于多 Master 多 Slave 集群;
2、多 Master 多 Slave 集群,其不仅可以保证数据不丢失,也不会影响消息写入,但其运行效率要低于多 Master + RAID 阵列。

3.6 磁盘阵列 RAID

3.6.1 RAID历史

1988 年美国加州大学伯克利分校的 D. A. Patterson 教授等首次在论文 “A Case of Redundant Array of Inexpensive Disks” 中提出了 RAID 概念 ,即廉价冗余磁盘阵列( Redundant Array of Inexpensive Disks )。由于当时大容量磁盘比较昂贵, RAID 的基本思想是将多个容量较小、相对廉价的磁盘进行有机组合,从而以较低的成本获得与昂贵大容量磁盘相当的容量、性能、可靠性。随着磁盘成本和价格的不断降低, “廉价”已经毫无意义。因此, RAID 咨询委员会( RAID Advisory Board, RAB )决定用“独立”替代“廉价” ,于时 RAID 变成了独立磁盘冗余阵列( Redundant Array of Independent Disks )。但这仅仅是名称的变化,实质内容没有改变。

3.6.2 RAID 等级

RAID 这种设计思想很快被业界接纳, RAID 技术作为高性能、高可靠的存储技术,得到了非常广泛的应用。 RAID 主要利用镜像、数据条带和数据校验三种技术来获取高性能、可靠性、容错能力和扩展性,根据对这三种技术的使用策略和组合架构,可以把 RAID 分为不同的等级,以满足不同数据应用的需求。

D. A. Patterson 等的论文中定义了 RAID0 ~ RAID6 原始 RAID 等级。随后存储厂商又不断推出 RAID7、RAID10、RAID01、RAID50、 RAID53、RAID100 等 RAID 等级,但这些并无统一的标准。目前业界与学术界公认的标准是 RAID0 ~ RAID6,而在实际应用领域中使用最多的 RAID 等级是 RAID0、RAID1、RAID3、RAID5、RAID6 和 RAID10。

RAID 每一个等级代表一种实现方法和技术,等级之间并无高低之分。在实际应用中,应当根据用户的数据应用特点,综合考虑可用性、性能和成本来选择合适的 RAID 等级,以及具体的实现方式。

3.6.3 关键技术

镜像技术

镜像技术是一种冗余技术,为磁盘提供数据备份功能,防止磁盘发生故障而造成数据丢失。对于 RAID 而言,采用镜像技术最典型地的用法就是,同时在磁盘阵列中产生两个完全相同的数据副本,并且分布在两个不同的磁盘上。镜像提供了完全的数据冗余能力,当一个数据副本失效不可用时,外部系统仍可正常访问另一副本,不会对应用系统运行和性能产生影响。而且,镜像不需要额外的计算和校验,故障修复非常快,直接复制即可。镜像技术可以从多个副本进行并发读取数据,提供更高的读 I/O 性能,但不能并行写数据,写多个副本通常会导致一定的 I/O 性能下降。

镜像技术提供了非常高的数据安全性,其代价也是非常昂贵的,需要至少双倍的存储空间。高成本限制了镜像的广泛应用,主要应用于至关重要的数据保护,这种场合下的数据丢失可能会造成非常巨大的损失。

数据条带化技术

数据条带化技术是一种自动将 I/O 操作负载均衡到多个物理磁盘上的技术。更具体地说就是,将一块连续的数据分成很多小部分并把它们分别存储到不同磁盘上。这就能使多个进程可以并发访问数据的多个不同部分,从而获得最大程度上的 I/O 并行能力,极大地提升性能。

数据校验技术

数据校验技术是指 RAID 要在写入数据的同时进行校验计算,并将得到的校验数据存储在 RAID 成员磁盘中。校验数据可以集中保存在某个磁盘或分散存储在多个不同磁盘中。当其中一部分数据出错时,就可以对剩余数据和校验数据进行反校验计算重建丢失的数据。

数据校验技术相对于镜像技术的优势在于节省大量开销,但由于每次数据读写都要进行大量的校验运算,对计算机的运算速度要求很高,且必须使用硬件 RAID 控制器。在数据重建恢复方面,检验技术比镜像技术复杂得多且慢得多。

3.6.4 RAID分类

从实现角度看, RAID 主要分为软 RAID、硬 RAID 以及混合 RAID 三种。

软 RAID

所有功能均有操作系统和 CPU 来完成,没有独立的 RAID 控制处理芯片和 I/O 处理芯片,效率自然最低。

硬 RAID

配备了专门的 RAID 控制处理芯片和 I/O 处理芯片以及阵列缓冲,不占用 CPU 资源。效率很高,但成本也很高。

混合 RAID

具备 RAID 控制处理芯片,但没有专门的I/O 处理芯片,需要 CPU 和驱动程序来完成。性能和成本在软 RAID 和硬 RAID 之间。

3.7 常见 RAID 等级详解

3.7.1 JBOD

JBOD ,Just a Bunch of Disks,磁盘簇。它表示一个没有控制软件提供协调控制的磁盘集合,这是 RAID 区别与 JBOD 的主要因素。JBOD 将多个物理磁盘串联起来,提供一个巨大的逻辑磁盘。

JBOD 的数据存放机制是由第一块磁盘开始按顺序往后存储,当前磁盘存储空间用完后,再依次往后面的磁盘存储数据。 JBOD 存储性能完全等同于单块磁盘,而且也不提供数据安全保护。其只是简单提供一种扩展存储空间的机制,JBOD 可用存储容量等于所有成员磁盘的存储空间之和。

现如今 JBOD 常指磁盘柜,而不论其是否提供 RAID 功能。不过,JBOD 并非官方术语,官方称为 Spanning。

3.7.2 RAID0

RAID0

RAID0 是一种简单的、无数据校验的 数据条带化技术。实际上不是一种真正的 RAID ,因为它并不提供任何形式的冗余策略。 RAID0 将所在磁盘条带化后组成大容量的存储空间,将数据分散存储在所有磁盘中,以独立访问方式实现多块磁盘的并读访问。

理论上讲,一个由 n 块磁盘组成的 RAID0 ,它的读写性能是单个磁盘性能的 n 倍,但由于总线带宽等多种因素的限制,实际的性能提升低于理论值。由于可以并发执行 I/O 操作,总线带宽得到充分利用,再加上不需要进行数据校验,RAID0 的性能在所有 RAID 等级中是最高的

RAID0 具有低成本、高读写性能、 100% 的高存储空间利用率等优点,但是它不提供数据冗余保护,一旦数据损坏,将无法恢复。

应用场景:对数据的顺序读写要求不高,对数据的安全性和可靠性要求不高,但对系统性能要求很高的场景。

RAID0 与 JBOD 的异同

相同点:

1、存储容量:都是成员磁盘容量总和;

2、磁盘利用率,都是 100%,即都没有做任何的数据冗余备份。

不同点:

JBOD:数据是顺序存放的,一个磁盘存满后才会开始存放到下一个磁盘。

RAID0:各个磁盘中的数据写入是并行的,是通过数据条带技术写入的,其读写性能是 JBOD 的 n 倍。

3.7.3 RAID1

RAID1

RAID1 就是一种 镜像技术,它将数据完全一致地分别写到工作磁盘和镜像磁盘,它的磁盘空间利用率为 50% 。 RAID1 在数据写入时,响应时间会有所影响,但是读数据的时候没有影响。 RAID1 提供了最佳的数据保护,一旦工作磁盘发生故障,系统将自动切换到镜像磁盘,不会影响使用。

RAID1是为了增强数据安全性使两块磁盘数据呈现完全镜像,从而达到安全性好、技术简单、管理方便。 RAID1 拥有完全容错的能力,但实现成本高。

应用场景:对 顺序 读写性能要求较高,或对数据安全性要求较高的场景。

3.7.4 RAID10

RAID10

RAID10 是一个 RAID1 与 RAID0 的组合体,所以它继承了 RAID0 的快速和 RAID1 的安全。

简单来说就是,先做条带,再做镜像。也就是先将进来的数据分散到不同的磁盘,再将磁盘中的数据做镜像。

3.7.5 RAID01

RAID01

RAID01 是一个 RAID0 与 RAID1 的组合体,所以它继承了 RAID0 的快速和 RAID1 的安全。

简单来说就是,先做镜像再做条带。也就是将进来的数据先做镜像,再将镜像数据写入到与之前数据不同的磁盘,即再做条带。

RAID10 要比 RAID01 的容错率再高,所以生产环境下一般使用 RAID10。

3.8 集群搭建实践

3.8.1 集群架构

在此我们将搭建一个双主双从异步复制的 Broker 集群。为了方便,这里使用了两台主机来完成集群的搭建。你可以选择使用两台阿里云 ECS 来实现,也可以使用虚拟机与主机克隆来实现。

这两台主机的功能与 Broker 角色分配如下表:

序号 主机名/IP IP 功能 Broker角色
1 rocketmqOS1 192.168.59.164 NameServer + Broker Master1 + Slave2
2 rocketmqOS2 192.168.59.165 NameServer + Broker Master2 + Slave1

以上信息均根据参考视频编写,而非博主实际环境。

在进行集群搭建时,首先需要在这两台主机上安装单机版的 RocketMQ 以保证环境的正确。

3.8.2 修改 rocketmqOS1 配置文件

前往 RocketMQ 的解压目录,进入 conf 目录:

1
cd conf

再执行命令 ll,可以看到这个目录下有这些文件:

RocketMQ中的conf目录

由于我们需要搭建一个双主双从异步复制的 Broker 集群,因此切换到 2m-2s-async 目录下:

1
cd 2m-2s-async

2m-2s-async目录下的文件信息

然后修改 broker-a.properties 文件,配置内容修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=DefaultCluster
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-a
# master的brokerId为0
brokerId=0
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制master
brokerRole=ASYNC_MASTER
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876

然后再修改 broker-b-s.properties 文件,配置内容修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
brokerClusterName=DefaultCluster
# 指定这是另外一个master-slave集群
brokerName=broker-b
# slave的brokerId为非0
brokerId=1
deleteWhen=04
fileReservedTime=48
# 指定当前broker为slave
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
# 指定Broker对外提供服务的端口,即Broker与producer与consumer通信的端口。默认
10911。由于当前主机同时充当着master1与slave2,而前面的master1使用的是默认端口。这
里需要将这两个端口加以区分,以区分出master1与slave2
listenPort=11911
# 指定消息存储相关的路径。默认路径为~/store目录。由于当前主机同时充当着master1与
slave2,master1使用的是默认路径,这里就需要再指定一个不同路径
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort

这样就配置好一台主机了,按照同样的方式修改 rocketmqOS2 的配置文件。

其他配置

我们还可以在配置文件中设置其他属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#指定整个broker集群的名称,或者说是RocketMQ集群的名称
brokerClusterName=rocket-MS
#指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#默认为新建Topic所创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议生产环境中关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议生产环境中关闭
autoCreateSubscriptionGroup=true
#Broker对外提供服务的端口,即Broker与producer与consumer通信的端口
listenPort=10911
#HA高可用监听端口,即Master与Slave间通信的端口,默认值为listenPort+1
haListenPort=10912
#指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
#指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
#指定commitLog目录中每个文件的大小,默认1G
mapedFileSizeCommitLog=1073741824
#指定ConsumeQueue的每个Topic的每个Queue文件中可以存放的消息数量,默认30w条
mapedFileSizeConsumeQueue=300000
#在清除过期文件时,如果该文件被其他线程所占用(引用数大于0,比如读取消息),此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳。该属性则表示从第一次拒绝删除后开始计时,该文件最多可以保留的时长。在此时间内若引用数仍不为0,则删除仍会被拒绝。不过时间到后,文件将被强制删除
destroyMapedFileIntervalForcibly=120000
#指定commitlog、consumequeue所在磁盘分区的最大使用率,超过该值,则需立即清除过期文件
diskMaxUsedSpaceRatio=88
#指定store目录的路径,默认在当前用户主目录中
storePathRootDir=/usr/local/rocketmq-all-4.5.0/store
#commitLog目录路径
storePathCommitLog=/usr/local/rocketmq-all-4.5.0/store/commitlog
#consumeueue目录路径
storePathConsumeQueue=/usr/local/rocketmq-all-4.5.0/store/consumequeue
#index目录路径
storePathIndex=/usr/local/rocketmq-all-4.5.0/store/index
#checkpoint文件路径
storeCheckpoint=/usr/local/rocketmq-all-4.5.0/store/checkpoint
#abort文件路径
abortFile=/usr/local/rocketmq-all-4.5.0/store/abort
#指定消息的最大大小
maxMessageSize=65536
#Broker的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盘策略
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=192.168.3.105

3.8.3 修改 rocketmqOS2 配置文件

同样需要前往 RocketMQ 安装目录下的 conf 目录的子目录 2m-2s-async,然后修改其中的两个配置文件,只不过这里修改的配置文件与 rocketmqOS1 修改的配置文件不一样。

首先需要修改 broker-b.properties 文件,配置内容修改为如下:

1
2
3
4
5
6
7
8
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876

然后再修改 broker-a-s.properties 文件,配置内容修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
listenPort=11911
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort

到此,两台主机的配置就修改完毕了,接下来逐一启动即可。

3.8.4 启动 RocketMQ

首先启动 rocketmqOS1 与 rocketmqOS2 两个主机中的 NameServer,它们启动命令完全相同:

1
2
nohup sh bin/mqnamesrv & # 启动命令
tail -f ~/logs/rocketmqlogs/namesrv.log # 日志查看命令

然后再分别启动 rocketmqOS1 与 rocketmqOS2 两个主机中的 Broker Master。需要注意的是指定它们所要加载的配置文件是不一样的。

1
2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
1
2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log

最后分别启动 rocketmqOS1 与 rocketmqOS2 两个主机中的 Broker Slave。同样,指定它们所要加载的配置文件也是不一样的。

1
2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
1
2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

到此,集群就搭建并启动完毕了。

如果执行 jps 命令,可以看到有两个 BrokerStartUp 进程。

如果我们还想使用控制台 rocketmq-externals,这需要修改工程的配置文件,将两个 NameServer 的信息追加在配置文件中(以逗号隔开即可)。

3.9 mqadmin 命令

在 RocketMQ 解压目录的 bin 目录下有一个 mqadmin 命令,该命令是一个运维指令,可对 MQ 的主题、集群、Broker 等信息进行管理。

但在运行 mqadmin 命令之前,先要修改 MQ 解压目录下 bin/tools.sh 配置的 JDK ext 目录的位置。

本机 JDK 的 ext 目录所在位置是:/usr/local/java/jdk1.8.0_301/jre/lib/ext

在 RocketMQ 的安装目录下切换到 bin 目录:

1
cd bin

然后修改 tools.sh 文件:

1
vim tools.sh

JAVA_OPT 配置的 -Djava.ext.dirs 这一行的后面添加 JDK ext 的路径:

修改tools.sh配置

修改完毕后,在 MQ 安装目录下直接运行该命令,可以看到其可以添加的 Commands,通过这些 Commands 可以完成很多的功能:

1
./bin/mqadmin
The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   QueryMsgTraceById    query a message trace
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client version
   consumerConnection   Query consumer's socket connection, client version and subscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.
   sendMessage          Send a message
   consumeMessage       Consume message
   updateAclConfig      Update acl config yaml file in broker
   deleteAccessConfig   Delete Acl Config Account in broker
   clusterAclConfigVersion List all of acl config version information in cluster
   updateGlobalWhiteAddr Update global white address for acl Config File in broker
   getAccessConfigSubCommand List all of acl config information in cluster
   See 'mqadmin help <command>' for more information on a specific command.

关于该命令在官网中有详细的用法解释:mqadmin命令用法官方文档

mqadmin命令官方文档示例


  1. Topic与Tag最佳实践 ↩︎

  2. RocketMQ快速开始 ↩︎