侧边栏壁纸
博主头像
嘉顺

顺着一路星光,去往有你的嘉处

  • 累计撰写 3 篇文章
  • 累计创建 1 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

RabbitMQ

Administrator
2026-05-22 / 0 评论 / 0 点赞 / 25 阅读 / 0 字

RabbitMQ

四大核心概念:

生产者:产生数据发送消息的程序

交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 。

队列:队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

RabbitMQ中的名词介绍:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

  • Connection:publisher/consumer 和 broker 之间的 TCP 连接

  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

  • Exchange:交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

  • Queue:消息最终被送到这里等待 consumer 取走

  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

文件配置

rabbitmq默认的配置文件位于/etc/rabbitmq/rabbitmq.conf,可以通过环境变量覆盖默认的配置文件路径

#环境变量实现
    export RABBITMQ_CONFIG_FILE=/custom/path/rabbitmq
    #不加扩展名
    #实际读取的是 /custom/path/rabbitmq.conf
​
#修改rabbitmq-env.conf变量配置文件实现
    vim /etc/rabbitmq/rabbitmq-env.conf
    NODENAME=rabbit@rabbitmq-server1            #节点名称(必须唯一)
    CONFIG_FILE=/custom/path/my-rabbitmq.conf   #配置文件路径
    MNESIA_BASE=/data/rabbitmq/mnesia           #数据存储目录
    LOG_BASE=/data/logs/rabbitmq                #日志存储路径

RabbitMQ 3.7.0+ 开始使用 INI-like 格式(Key = Value),也支持旧版本的 Erlang 格式(advanced.config),但推荐使用新版格式。新版本有些配置项无法配置,只能通过旧版本配置进行配置,两个配置文件同时存在也不会出错,会合并生效。rabbitmq.conf配置优先级高于advanced.config

配置实例

# 查看RabbitMQ当前配置项
rabbitmqctl environment
rabbitmq-diagnostics environment
 
# 查看某项配置
rabbitmqctl environment | grep channel_max
​
vim /etc/rabbitmq/rabbitmq.conf
#网络与监听设置
    # RabbitMQ监听端口(默认5672,AMQP协议)
    listeners.tcp.default = 5672
​
    # 绑定的IP地址(默认监听所有 0.0.0.0)
    listeners.tcp.1 = 0.0.0.0:5672
​
    # 启用TLS/SSL
    listeners.ssl.default = 5671
    ssl_options.cacertfile = /etc/rabbitmq/ca_certificate.pem
    ssl_options.certfile = /etc/rabbitmq/server_certificate.pem
    ssl_options.keyfile = /etc/rabbitmq/server_key.pem
    ssl_options.verify = verify_peer
    ssl_options.fail_if_no_peer_cert = true
​
#认证和权限
    # 默认用户(管理用)
    default_user = guest
    default_pass = guest
​
    # 允许远程访问 guest 用户(默认false)
    loopback_users.guest = false
​
#消息队列参数(调整性能)
    # 单连接最大并发channel数
    channel_max = 2048
​
    # 单消息最大字节数(默认128MB)
    frame_max = 131072
​
    # 心跳检测时间(秒)
    heartbeat = 60
​
    # 每个队列最大长度(条数)
    # 注意:这通常通过策略(policy)而非配置文件设置
    
#持久化与存储
    # 数据存储路径
    # 默认:/var/lib/rabbitmq/mnesia/
    # 使用磁盘节点时重要
    node_data_dir = /var/lib/rabbitmq/mnesia/rabbit@hostname
​
#日志配置
    # 日志文件路径
    log.dir = /var/log/rabbitmq
    log.file = /var/log/rabbitmq/rabbit.log
​
    # 日志级别(debug, info, warning, error)
    log.level = info
    
#插件配置
    # 启用management插件相关配置
    management.tcp.port = 15672
    management.load_definitions = /etc/rabbitmq/definitions.json

日志管理

syslog

#日志服务端配置
vim /etc/rsyslog.conf
    module(load="imtcp") # needs to be done just once
    input(type="imtcp" port="514")
    
    #定义模板,按来源 IP 分目录存储
    $template RemoteLogTemplate, "/var/log/remote/%FROMHOST-IP%/syslog.log"
​
    #所有非本机日志套用该模板
    :fromhost-ip, !isequal, "127.0.0.1" ?RemoteLogTemplate
    
#重启使其生效    
systemctl restart rsyslog.service
如果不做模板,定义IP分目录存储,那么客户端和本地的日志都会混合存储在/var/log/messages中
​
​
#TCP/UDP
vim /etc/rabbitmq/rabbitmq.conf
    log.syslog = true                           #启用syslog输出
    log.syslog.transport = tcp                  #使用TCP协议进行传输,或者UDP(UDP速度更快,TCP安全)
    log.syslog.protocol = rfc5424               #使用 RFC 5424 标准的 Syslog 协议格式(可以不写)
    log.syslog.host = my.syslog-server.local    #远程日志服务器地址
    log.syslog.port = 1514                      #远程日志服务器监听端口
   
#TLS加密
    log.syslog = true
    log.syslog.transport = tls
    log.syslog.protocol = rfc5424
    log.syslog.host = my.syslog-server.local
    log.syslog.port = 1514
​
    log.syslog.ssl_options.cacertfile = /path/to/ca_certificate.pem
    log.syslog.ssl_options.certfile = /path/to/client_certificate.pem
    log.syslog.ssl_options.keyfile = /path/to/client_key.pem
    
    # 只发送 warning 及以上级别的日志到 syslog
    log.syslog.level = warning
​
    # 格式化为 JSON 便于日志平台处理(如 ELK)
    log.syslog.formatter = json
​
    # 自定义在 syslog 中的标识
    log.syslog.identity = my_rabbitmq
    log.syslog.facility = user
    
#rabbitmq配置
vim /etc/rabbitmq/rabbitmq.conf
    log.syslog = true
    log.syslog.transport = tcp
    log.syslog.host = 192.168.135.40
    log.syslog.port = 514
​
#重启生效
systemctl restar rabbitmq-server.service

logrotate

RabbitMQ 官方推荐使用系统的 logrotate 工具进行日志切割。这是最稳定、最通用的方式。

vim /etc/logrotate.d/rabbitmq-server    #配置参数不需要记住,用到的时候去查就行了
/var/log/rabbitmq/*.log {
    weekly
    missingok
    rotate 8
    compress
    delaycompress
    notifempty
    create 640 rabbitmq adm
    postrotate
        /usr/bin/rabbitmqctl reopen_logs > /dev/null
    endscript
}
#如果使用syslog进行日志管理,就得在syslog服务端对日志进行logrotate轮转管理

MQ内置

从 RabbitMQ 3.8 开始,支持内置的日志切割功能,无需依赖 logrotate

vim /etc/rabbitmq/rabbitmq.conf
    # 启用基于大小的切割
    log.file.rotation.size = 10485760  # 10MB
​
    # 启用基于时间的切割(每天)
    log.file.rotation.date = $D0
​
    # 保留文件数量
    log.file.rotation.count = 10
    
#示例
# 方案 A:仅按文件大小轮转 (当日志达到10MB时切割,保留5个历史文件)
log.file.rotation.size = 10485760
log.file.rotation.count = 5
​
# 方案 B:同时使用大小和日期轮转 (接近10MB或到每天零点都会切割,保留7个历史文件)
log.file.rotation.size = 10485760
log.file.rotation.date = $D0
log.file.rotation.count = 7

仲裁队列

生产仲裁队列的代码

#生产者,仲裁队列
[root@rabbitmq-server1 ~]# vim quorum-Producer.java 
​
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.Map;
​
class Producer {
    // 消息队列名称
    private final static String QUEUE_NAME = "kkk";
​
    public void send() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.135.100");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("zgs666");
        factory.setVirtualHost("/admin");
​
        // 创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
​
            Map<String, Object> queueArgs = new HashMap<>();
            queueArgs.put("x-queue-type", "quorum");  // 设置队列类型为仲裁队列
​
            // 声明队列(持久化 + 仲裁队列)
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
​
            // 发送10000条消息
            for (int i = 0; i < 10000; i++) {
                String message = "Hello World RabbitMQ count: " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
​
    public static void main(String[] args) {
        try {
            new Producer().send();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
​
​
#消费者,仲裁队列
[root@rabbitmq-server1 ~]# vim quorum-Consumer.java 
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.Map;
​
class Consumer {
    // 消息队列名称(与生产者保持一致)
    private final static String QUEUE_NAME = "kkk";
​
    public void receive() throws IOException, TimeoutException {
        // 创建连接工厂(配置与生产者一致)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.135.132");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/admin");
​
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
​
        // 设置仲裁队列参数(与生产者一致)
        Map<String, Object> queueArgs = new HashMap<>();
        queueArgs.put("x-queue-type", "quorum");  // 设置队列类型为仲裁队列
​
        // 声明队列(持久化 + 仲裁队列)
        channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
​
        System.out.println(" [*] 等待仲裁队列消息。退出请按 CTRL+C");
​
        // 创建消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 收到: '" + message + "'");
        };
​
        // 开始消费消息(自动确认模式)
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
​
    public static void main(String[] args) {
        try {
            new Consumer().receive();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
​
#使用方式
    
tar -xf jdk-8u171-linux-x64.tar.gz  -C /usr/local/
    
vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_171 
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
​
#下载依赖到当前目录
wget https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.16.0/amqp-client-5.16.0.jar
wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar
​
​
编译并运行
#编译
javac -cp .:amqp-client-5.16.0.jar:slf4j-api-1.7.36.jar quorum-Producer.java
​
#运行(生产消息)
java -cp .:amqp-client-5.16.0.jar:slf4j-api-1.7.36.jar quorum-Producer
​
#消费消息也一样,编译消费者代码就可以了

主节点故障模拟

模拟集群仲裁队列主节点意外故障,重启失败的案例

通过命令查看队列成员信息,确定哪个节点是主节点,也可以在web页面查看

从节点故障模拟

模拟集群仲裁队列从节点意外故障,重启失败的案例

模拟故障

#将其中一个从节点关闭
[root@rabbitmq-server1 ~]# systemctl stop rabbitmq-server.service 
​
#然后重启报错
[root@rabbitmq-server1 ~]# systemctl restart rabbitmq-server.service 
Job for rabbitmq-server.service failed because the control process exited with error code.
See "systemctl status rabbitmq-server.service" and "journalctl -xeu rabbitmq-server.service" for details.
​
#下面时报错信息
[root@rabbitmq-server1 rabbit@rabbitmq-server1]# journalctl -xeu rabbitmq-server.service
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              server_recovery_strategy =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                               {rabbit_quorum_queue,system_recover,[]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_compute_checksums => true,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_min_bin_vheap_size => 2971008,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_data_dir =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                               "/var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1/quorum/rabbit@rabbitmq-server1",
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              segment_max_pending => 1024,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_sync_method => datasync,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_garbage_collect => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_pre_allocate => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              server_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              default_max_pipeline_count => 4096,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              receive_snapshot_timeout => 30000,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              low_priority_commands_flush_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              low_priority_commands_in_memory_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              machine_upgrade_strategy => all}]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                          permanent,false,infinity,supervisor,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                          [ra_system_sup]}}}
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - failed to process log event #{meta => #{error_logger => #{tag => info_report,type => std_info,report_cb => fun application_controller:>
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - removing handler syslog_logger_h from logger
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Kernel pid terminated (application_controller) ("{application_start_failure,rabbit,{{{shutdown,{failed_to_start_child,ra_log_sup,{shutdown,{failed_to_st>
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: 
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Crash dump is being written to: /var/log/rabbitmq/erl_crash.dump...done
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Main process exited, code=exited, status=1/FAILURE
░░ Subject: Unit process exited
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ An ExecStart= process belonging to unit rabbitmq-server.service has exited.
░░ 
░░ The process' exit code is 'exited' and its exit status is 1.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
░░ Subject: Unit failed
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ The unit rabbitmq-server.service has entered the 'failed' state with result 'exit-code'.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: Failed to start Open source RabbitMQ server.
░░ Subject: rabbitmq-server.service 单元已失败
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ rabbitmq-server.service 单元已失败。
░░ 
░░ 结果为“failed”。
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Consumed 10.121s CPU time.
░░ Subject: Resources consumed by unit runtime
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ The unit rabbitmq-server.service completed and consumed the indicated resources.
...skipping...
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              server_recovery_strategy =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                               {rabbit_quorum_queue,system_recover,[]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_compute_checksums => true,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_min_bin_vheap_size => 2971008,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_data_dir =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                               "/var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1/quorum/rabbit@rabbitmq-server1",
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              segment_max_pending => 1024,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_sync_method => datasync,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_garbage_collect => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_pre_allocate => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              wal_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              server_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              default_max_pipeline_count => 4096,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              receive_snapshot_timeout => 30000,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              low_priority_commands_flush_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              low_priority_commands_in_memory_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                              machine_upgrade_strategy => all}]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                          permanent,false,infinity,supervisor,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]:                          [ra_system_sup]}}}
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - failed to process log event #{meta => #{error_logger => #{tag => info_report,type => std_info,report_cb => fun application_controller:>
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - removing handler syslog_logger_h from logger
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Kernel pid terminated (application_controller) ("{application_start_failure,rabbit,{{{shutdown,{failed_to_start_child,ra_log_sup,{shutdown,{failed_to_st>
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: 
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Crash dump is being written to: /var/log/rabbitmq/erl_crash.dump...done
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Main process exited, code=exited, status=1/FAILURE
░░ Subject: Unit process exited
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ An ExecStart= process belonging to unit rabbitmq-server.service has exited.
░░ 
░░ The process' exit code is 'exited' and its exit status is 1.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
░░ Subject: Unit failed
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ The unit rabbitmq-server.service has entered the 'failed' state with result 'exit-code'.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: Failed to start Open source RabbitMQ server.
░░ Subject: rabbitmq-server.service 单元已失败
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ rabbitmq-server.service 单元已失败。
░░ 
░░ 结果为“failed”。
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Consumed 10.121s CPU time.
░░ Subject: Resources consumed by unit runtime
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░ 
░░ The unit rabbitmq-server.service completed and consumed the indicated resources.
​
#解决方法
#在健康的节点上将故障节点踢出集群
rabbitmqctl forget_cluster_node rabbit@rabbitmq-server1
​
#清理server1上的数据目录
rm -rf /var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1
​
#重新启动server1服务
systemctl start rabbitmq-server
​
#将server1重新加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq-server2
rabbitmqctl start_app
​
#验证集群状态和队列成员
rabbitmqctl cluster_status
rabbitmq-queues -p /admin quorum_status kkk    #此时server1不会加入,需要手动加入
​
#将server1重新加回仲裁队列的成员列表(在建康节点执行)
rabbitmq-queues add_member -p /admin kkk rabbit@rabbitmq-server1
​
#等待一两分钟,Raft协议会自动向server1同步全量数据
rabbitmq-queues -p /admin quorum_status kkk
image-20260511170523563

状态

含义

做什么

可以有多个吗

leader

领导者

处理所有客户端读写请求。生产者发消息、消费者拉消息,都只经过 Leader。Leader 把操作记入日志,再同步给所有 Follower。

只能有一个

follower

跟随者

被动接收 Leader 的日志,保持数据同步。不直接处理客户端请求(客户端连到 Follower 会自动被重定向到 Leader)。

可以有多个(通常 2 个)

candidate

候选者

节点发起选举时的临时状态。当 Follower 长时间没收到 Leader 心跳,就变成 Candidate,向其他节点索要投票,争取成为新 Leader。

临时存在,选举结束后要么变 Leader 要么退回 Follower

0
博主关闭了所有页面的评论