# rabbitMQ ## 引言 ### 什么是MQ MQ(Message Quene) 翻译为 消息队列 ,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成 ### MQ有哪些 当今市面上很多主流的消息中间件,如老牌ActiveMQ、RabbitMQ,炙手可热的kafka,阿里自主开发RocketMQ等 ### 不同MQ的特点 #### 1、ActiveMQ Apache出品,最流行的,能力强劲的开源消息总线。他是完全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎(性能瓶颈比较大,吞吐量不太高) #### 2、Kafka Kafka是分布式发布-订阅消息系统,主要特点基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。(效率最高,但是不保证数据丢失以及重复) #### 3、RocketMQ 阿里开源消息中间件,纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,RocketMQ思路起源于kafka,但是并不是kafka的一个Copy,它对消息的可靠传输及事务性做优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推算、日志流式处理、binlog分布等场景(收费,开源的功能被阉割) #### 4、RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现,AMQP的主要特征是面向消息、队列、路由(包括点对点的发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次 RabbitMQ比kafka可靠,kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。 ## RabbitMQ详解 ### RabbitMQ介绍 是一个在AMQP(Advanced Message Queuing Protocol)基础上实现,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。支持多种客户端如: Python 、Ruby 、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP、XMPP、SMTP、STOMP,也正是如此,使得它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker架构,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load Balance)或者数据持久化都有很好的支持。 ### RabbitMQ的特点 可靠性、灵活的路由、扩展性、高可用性、多种协议、多语言客户端、管理界面、插件机制 ### AMQP介绍 AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品以及不同开发语言等条件的限制。 ### 什么是消息队列 MQ全程Message Queue,消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用链接来链接他们。 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接受和发送应用程序同时执行的要求 在项目中,将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量 ### RabbitMQ应用场景 对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统。那么这些模块之间如何通信?这个传统的IPC有很大区别,传统的IPC很多都是在单一系统上的,慕课耦合性很大,不适合扩展(Scalability),如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如: 1、信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失? 2、如何降低发送者和接收者的耦合度? 3、如何让priority高的接收者先接到数据? 4、如何做到load balance?有效均衡接收者的负载? 5、如何有效的将数据发送到相关的接收者?也就是说将接受者subscribe不同的数据,如何做有效的filter。 6、如何做到可扩展,甚至将这个通信模块发到cluster上? 7、如何保证接收者接收到了完整,正确的数据? AMQP协议解决了以上的问题,而RabbitMQ实现了AMQP ### RabbitMQ概念介绍 Broker:简单来说就是消息队列服务器实体 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列 Queue:消息队列载体,每个消息都会被投入到一个或者多个队列 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来 Routing Key:路由关键字,exchange根据这个关键字进行消息投递 Vhost:虚拟主机,一个broker里可以开设多个vhost,作用不同用户的权限分离 Producer:消息生产者,就是投递消息的程序 Consumer:消息消费者,就是接受消息的程序 Channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务。 RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息 ### RabbitMQ使用流程 AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到响应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。消息队列的使用过程大概如下: 1、客户端连接到消息队列服务器,打开一个channel。 2、客户端声明一个exchange,并设置相关属性 3、客户端声明一个queue,并设置相关属性 4、客户端使用routing key,在exchange和queue之间建立好绑定关系 5、客户端投递消息到exchange Exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。Exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如:绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为abc的才会投递到队列 ## RabbitMQ单机安装 ### 安装arlang 官网https://www.erlang-solutions.com/resources/download.html 注意需要下載rabbitmq版本所支持对应的版本 ``` [root@master ~]# cd /usr/local/src/ [root@master src]# mkdir /usr/local/src/rabbitmq [root@master src]# cd !$ [root@master rabbitmq]# wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.2.1-1~centos~7_amd64.rpm [root@master rabbitmq]# yum -y install esl-erlang_23.2.1-1~centos~7_amd64.rpm ``` ### 安装rabbitMQ 官网:https://www.rabbitmq.com/install-rpm.html#install-erlang ``` [root@master rabbitmq]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm [root@master rabbitmq]# yum -y install rabbitmq-server-3.8.9-1.el7.noarch.rpm 常用命令: 启动 [root@master rabbitmq]# systemctl start rabbitmq-server 查看状态 [root@master rabbitmq]# systemctl status rabbitmq-server 停止 [root@master rabbitmq]# systemctl stop rabbitmq-server 开机自启动 [root@master rabbitmq]# systemctl enable rabbitmq-server 设置配置文件参考https://www.rabbitmq.com/configure.html ``` ### 相关操作 ``` 开启web界面管理工具 [root@master rabbitmq]# rabbitmq-plugins enable rabbitmq_management 关闭web界面管理工具 [root@master rabbitmq]# rabbitmq-plugins disbale rabbitmq_management 用户管理:默认用户guest,如果不配置的话只能在本地登录 新增一个用户: [root@master rabbitmq]# rabbitmqctl add_user admin 123456 删除用户 rabbitmqctl delete_user admin 修改用户的密码 rabbitmqctl change_password admin 120110 查看当前用户列表 Rabbitmqctl list_user 用户角色,分为5类,超级管理员、监控者、策略制定者、普通管理员以及其他 超级管理员(administrator) 可登录管理控制台(启用management plugin的情况下),可查看所有信息,并且可以对用户,策略进行操作 监控者(monitoring) 可登录管理控制台(启动management plugin的情况下),可以查看rabbitmq节点的相关信息(进程数、内存使用情况、磁盘使用情况) 策略制定者(policymaker) 可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 与administrator的对比,administrator能看到这些内容 普通管理者(management) 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。 了解了这些后,就可以根据需要给不同的用户设置不同的角色,以便按需管理。 如:给admin设置超级管理员角色权限 [root@master rabbitmq]# rabbitmqctl set_user_tags admin administrator 用户权限 用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。 例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。 相关命令为: (1) 设置用户权限 rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP (2) 查看(指定hostpath)所有用户的权限信息 rabbitmqctl  list_permissions  [-p  VHostPath] (3) 查看指定用户的权限信息 rabbitmqctl  list_user_permissions  User (4)  清除用户的权限信息 rabbitmqctl  clear_permissions  [-p VHostPath]  User 查看所有的队列 [root@master rabbitmq]# rabbitmqctl list_queues 清除所有的队列 [root@master rabbitmq]# rabbitmqctl reset 关闭应用 [root@master rabbitmq]# rabbitmqctl stop_app 启动应用 [root@master rabbitmq]# rabbitmqctl start_app ``` ![image-20210515210448320](/images/posts/image-20210515210448320.png) ## rabbitMQ集群部署以及配置 RabbitMQ是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署Rabbitmq分布式集群时需要先按照Erlang,并把其中一个服务的Cookie复制到另外的节点。 RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,一般的。均应建立为磁盘节点,为了防止机器重启后的消息丢失; RabbitMQ的cluster集群模式一般分为2种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制。 普通模式下,以两个节点(Rabbit01、Rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点Rabbit01(或者Rabbit02),Rabbit01和Rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连接rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈 镜像模式下,将需要消费的队列变为了镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样临时读取。缺点就是,集群内部的同步通讯会占用大量的网络宽度 ### 集群安装 | 主机名 | Ip | | --------- | -------------- | | Rabbitmq1 | 192.168.116.20 | | Rabbitmq2 | 192.168.116.21 | | Rabbitmq3 | 192.168.116.22 | 节点数目 因为有些功能(如:仲裁队列、MQTT中的客户端跟踪)需要集群成员即节点之间达成共识,所以集群中的节点数最好使用奇数 节点发现 rabbitmq 集群有很多节点发现方式 1、启动rabbitmq之后,使用命令行工具动态加入节点到集群 2、配置文件发现,在配置文件中直接写死有哪些节点,例如 cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config # 配置节点发现方式为配置文件发现(静态),还有其他的发现方式(动态的) 或者直接在命令行加节点 cluster_formation.classic_config.nodes.1 = test_rabbit1@rabbit1 cluster_formation.classic_config.nodes.2 = test_rabbit2@rabbit2 cluster_formation.classic_config.nodes.3 = test_rabbit3@rabbit3 3、其他:如DNS动态发现等,可以查看官方文档 ### 基础环境准备(所有机器) ``` 1 修改hosts,保证3台主机都能互通(都需要修改)以及关闭selinux和防火墙 [root@rabbitmq2 ~]# cat /etc/hosts 192.168.116.20 rabbitmq1 192.168.116.21 rabbitmq2 192.168.116.22 rabbitmq3 [root@rabbitmq1 rabbit]# systemctl disable --now firewalld sed -ri 's/(^SELINUX=).*/\1disabled/' /etc/selinux/config && setenforce 0 2 安装依赖环境erlange [root@rabbitmq1 ~]# mkdir rabbit [root@rabbitmq1 ~]# cd !$ [root@rabbitmq1 rabbit]# wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.2.1-1~centos~7_amd64.rpm [root@rabbitmq1 rabbit]# yum -y install esl-erlang_23.2.1-1~centos~7_amd64.rpm 3 安装rabbitmq [root@rabbitmq1 rabbit]# wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.8.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.8.9-1.el7.noarch.rpm [root@rabbitmq1 rabbit]# yum -y install rabbitmq-server-3.8.9-1.el7.noarch.rpm 4 分别启动rabbitmq以及rabbitmq_manage systemctl enable --now rabbitmq-server rabbitmq-plugins enable rabbitmq_management 因为默认guest不能远程登陆,所以分别创建登陆用户,并授予admin权限 rabbitmqctl add_user admin 123456 rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" ``` ### 搭建rabbitmq的一般模式集群 安装完rabbitmq之后,在机器中会有如下一个文件。路径在$HOME中或者在/var/lib/rabbitmq中,文件名称为.erlang.cookie,他是一个隐藏文件,name这文件存储的内容是什么?有什么作用? rabbitMQ的集群是依赖erlang集群,而erlang集群是通过这个cookie进行通信认证的,因此我们做集群的第一个就是干cookie ``` 1 统一erlang.cookie文件中的cookie值 必须要使集群中所有rabbitmq的.erlang.cookie文件中的cookie值一致,并且权限为owner只读 [root@rabbitmq1 ~]# scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq/ [root@rabbitmq1 ~]# scp /var/lib/rabbitmq/.erlang.cookie rabbitmq3:/var/lib/rabbitmq/ [root@rabbitmq1 ~]# ll /var/lib/rabbitmq/ -a -r-------- 1 rabbitmq rabbitmq 20 Dec 30 00:00 .erlang.cookie 所有机器重启rabbitmq systemctl restart rabbitmq-server 2、查看集群状态 [root@rabbitmq3 ~]# rabbitmqctl status #查看单个rabbitmq状态 [root@master ~]# rabbitmqctl cluster_status 此时的集群状态还只有1台机器 3、将节点加入集群,在rabbitmq2/rabbitmq3上执行 rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@rabbitmq1 #注意rabbit@master 为第一个机器,可以用命令rabbitmqctl status查看 rabbitmqctl start_app 再次进行查看可以看到其他节点的信息,web端也可以看到其他节点的信息,如下: ``` ![image-20210515210344082](/images/posts/image-20210515210344082.png) ### 镜像模式 镜像模式需要依赖policy模块 需要设置exchanges或者queue的数据需要复制同步,如何复制同步 ``` [root@rabbitmq1 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' 参数意思: - ha-all 集群策略名称 - ^ : 匹配符。只有一个^代表匹配所有,^xie 为匹配名称为xie的exchanges或者queue - ha-mode 为匹配类型,分为3种模式 - all 所有queue - exctly 部分(需要配置ha-params参数,此参数为int类型,比如3,众多集群中的随机3台机器) - node 指定(需要配置ha-params参数,此参数为数组类型,比如[“rabbit@mini1”,”rabbit@mini2”]这样来指定为F与G这2台机器) ``` 在web管理界面也是可以配置的 ![image-20210515210621103](C:\Users\谢优胜\Desktop\ZABBIX\image-20210515210621103.png) ## 配置内存|磁盘空间阈值 ### 支持的单位符号 \## k, kiB: kibibytes (2^10 - 1,024 bytes) \## M, MiB: mebibytes (2^20 - 1,048,576 bytes) \## G, GiB: gibibytes (2^30 - 1,073,741,824 bytes) \## kB: kilobytes (10^3 - 1,000 bytes) \## MB: megabytes (10^6 - 1,000,000 bytes) \## GB: gigabytes (10^9 - 1,000,000,000 bytes) ### 内存警告 RabbitMQ服务器在启动和执行 rabbitmqctl set_vm_memory_high_watermark 的时候会检测机器上的RAM大小,默认情况下,RabbitMQ使用内存超过40%的时候,会发出内存警告,阻塞所有发布消息的连接,一旦警告解除(例如:服务器paging消息到硬盘或者分发消息到消费者并且确认)服务会恢复正常。 默认的内存阀值是40%,注意,这并不会阻止RabbitMQ Server使用不到40%,仅仅意味着到达这个点的时候,发布者会被阻塞block,最坏的情况下,Erlang虚拟机会引起双倍的内存使用(RAM的80%),强烈建议开启操作系统的SWAP和Page files. 内存达到阀值后,发布者会被阻塞,但是消费者不会被阻塞,消费者继续消费消息,当内存降低到阀值以下后,发布者继续开始发布消息。 #### 配置内存阈值 ``` vm_memory_high_watermark.relative = 0.4 另外我们可以设置节点使用的RAM的限制(以字节为单位) vm_memory_high_watermark.absolute = 1073741824 另外你可以使用内存单位设置阀值,如果定义了relative,那么absolute将会被忽略 vm_memory_high_watermark.absolute = 2GB rabbitmq ctl工具修改阀值,服务重启后设置失效,如果要永久生效要修改配置文件 rabbitmqctl set_vm_memory_high_watermark 0.4 配置paging阀值 队列中的消息到达阀值上限之前,它会尝试将页面消息page到磁盘上以释放内存; 默认情况下:在broker达到阀值的50%时(默认内存阀值是0.4)会发生这种情况,可以通过修改如下配置进行修改: vm_memory_high_watermark_paging_ratio = 0.5 禁用所有的发布者 设置阀值(threshold)为0,会立即出发内存警告,阻塞所有的发布连接(如果你希望禁用全局发布); rabbitmqctl set_vm_memory_high_watermark 0 经典配置,可以直接配置在advanced.config配置文件中,永久有效 [{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75},{vm_memory_high_watermark, 0.4}]}]. ``` ### 磁盘警告 当可用的磁盘空间下降到配置值(最低阀值默认为48MIB,版本3.8.0)之下,会触发所有的警告,所有的生产者会被阻塞,目标是避免填充整个磁盘,这个会导致写操作的失败,导致RabbitMQ中断,为了减小填充磁盘的风险,所有进来的消息都会阻塞;但是消费者不会被阻塞,直到磁盘的可用空间升高到最低阀值之上,生产者就会继续开始推送消息。 #### 配置磁盘阈值 ``` 低于1.0的值很危险,应谨慎使用,值为1.0代表16GIB disk_free_limit.relative = 2.0 磁盘可用空间absolute设置,单位:字节 disk_free_limit.absolute = 1000000000 加上单位 disk_free_limit.absolute = 1GB 使用CTL工具修改阀值,服务重启后设置失效,如果要永久生效要修改配置文件 rabbitmqctl set_disk_free_limit 42949672000 经典配置,mem_relative值为1.0代表16GIB,可以直接配置在advanced.config配置文件中,永久有效 [{rabbit, [{disk_free_limit, {mem_relative, 1.0}}]}]. ```