部署RabbitMQ

这两天开始看RabbitMQ,目前在使用的环境是之前的老兄搭的,只是起了两个节点做了普通的集群,安装方式也比较奇怪。希望能够整理一下,把剩下的节点加进去,把冗余性提上来。

安装步骤

在CentOS6上直接使用EPEL安装。

# 安装Erlang环境
$ rpm -Uvh http://mirror.chpc.utah.edu/pub/epel/6/i386/epel-release-6-8.noarch.rpm
$ yum install erlang
# 安装RabbitMQ
$ wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-3.3.1-1.noarch.rpm
$ rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
$ yum install rabbitmq-server-3.3.1-1.noarch.rpm

配置

  1. Erlang Cookie配置

    将安装时自动生成的/var/lib/rabbitmq/.erlang.cookie文件分发到所有节点上,保持一致。

  2. 启用管理插件

     $ rabbitmq-plugins enable rabbitmq_management
    
  3. 配置环境,将数据写到SSD盘(路径按照实际环境)

     $ mkdir -p /work/rabbitmq/mnesia /work/rabbitmq/log
     $ chown rabbitmq:rabbitmq  -R /work/rabbitmq
     # 新建/etc/rabbitmq/rabbitmq-env.conf文件,写入以下内容
     MNESIA_BASE=/work/rabbitmq/mnesia
     LOG_BASE=/work/rabbitmq/log
    
  4. 启动服务

     service rabbitmq-server start
    
  5. 除主节点以外,其他节点还需执行以下命令组建集群

     $ rabbitmqctl stop_app
     $ rabbitmqctl join_cluster --ram rabbit@rabbitmq01
     $ rabbitmqctl start_app
    
  6. 更新用户及权限

     # 增加管理员admin
     $ rabbitmqctl add_user admin password
     $ rabbitmqctl set_user_tags admin administrator
     # 以后增加vhost时如有需要需要继续安装以下命令设置,或者通过Web管理界面设置
     $ rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
     # 删除RabbitMQ第一次启动时默认建立的guest用户
     $ rabbitmqctl delete_user guest
    

冗余和持久化

按照我目前的理解,RabbitMQ的持久化能力及冗余性从低到高排列如下:

  1. 自动删除

    建立队列时,如果将AUTO DELETE参数设置为”True”,那么当所有连接到该队列的消费者都断开后,队列自动删除。

  2. 默认

    默认交换机及队列建立后不进行持久化,虽然消费端都断开后队列不自动删除,但是当节点重启后,交换机及队列消失。

  3. 持久化

    交换机和队列在建立时可以分别设定为持久化保存,当两者都持久化时,对应的绑定规则也会持久化。持久化的交换机、队列及绑定规则在节点重启后继续存在。

  4. 消息内容持久化

    在3的基础之上,可以设置消息分发模式为2,此时每条消息都会存储在磁盘上,节点重启时队列内容不会消失。如果对可靠性要求不高,可以设置为1,能够加快速度。

  5. 队列镜像

    在普通集群模式,通过其他节点存取某节点建立的队列的消息时,两个节点之间只建立了一个临时通道,消息实际上只存在于建立该队列的节点上。如果队列所在节点宕机,那么该队列内容无法访问,并且由于队列持久化的原因,无法在其他节点重建相同的队列,只能等待故障节点恢复。当开启队列镜像后,队列内容会同步到每个镜像节点之上,任何节点异常队列都可以继续访问。

镜像队列最为安全,但是开销最大,每一条消息的存取都会在所有镜像节点之间同步。实际使用时各个队列根据需求使用不同的持久化及冗余模式。

使用及测试

  1. 测试环境结构

                                 Service IP
                                     |
           +-------------------------+----+      +--------------------+
           |     rabbitmq01       HAProxy |      |     rabbitmq02     |
           |    (192.168.1.1)   +----+----+      |    (192.168.1.2)   |
           |  RabbitMQ          |    | Polling   |  RabbitMQ          |
           |    Disk            |<---+---------->|    Ram             |
           |    Stats           |<-------------->|                    |
           +--------------------+  Cluster Sync  + -------------------+
    
  2. 队列镜像

     $ rabbitmqctl add_vhost tao
     $ rabbitmqctl add_user jasee password
     $ rabbitmqctl set_user_tags jasee management
     $ rabbitmqctl set_permissions -p tao jasee ".*" ".*" ".*"
     # tao vhost的所有队列开启镜像,更多参数见官方文档
     $ rabbitmqctl set_policy -p tao ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    
  3. 搭建HAProxy

     $ yum install haproxy
     $ cat /etc/haproxy/haproxy.cfg
     global
         log         127.0.0.1 local2
         chroot      /var/lib/haproxy
         pidfile     /var/run/haproxy.pid
         maxconn     4000
         user        haproxy
         group       haproxy
         daemon
         stats socket /var/lib/haproxy/stats
    
     defaults
         log                 global
         mode                tcp
         option              tcplog
         option              dontlognull
         retries             3
         option              redispatch
         maxconn             3000
         timeout connect     5s
         timeout client      1d
         timeout server      1d
    
     listen rabbitmq_cluster 0.0.0.0:5670
         mode tcp
         balance roundrobin
         server rabbitmq01 192.168.1.1:5672 check inter 2000 rise 2 fall 3
         server rabbitmq02 192.168.1.2:5672 check inter 2000 rise 2 fall 3
     $ service haproxy start
    
  4. 测试 使用如下的生产者和消费者脚本,反复关闭两个节点中的任一个,生产和消费都可以继续进行。

    • producer.py

        #!/usr/bin/env python
        # -*- coding: utf-8 -*-
      
        from amqplib import client_0_8 as amqp
        import time
      
        conn = amqp.Connection(host="192.168.1.1:5670", userid="jasee", password="password", virtual_host="tao", insist=False)
        chan = conn.channel()
      
        for x in xrange(100000):
            try:
                msg =amqp.Message("this is %s" % x)
                msg.properties["delivery_mode"] = 2
                chan.basic_publish(msg,exchange="test",routing_key="hello")
                time.sleep(1)
            except KeyboardInterrupt:
                print "Stop Send"
                break
            except:
                try:
                    chan.close()
                    conn.close()
                except:
                    pass
                print "Reconnecting..."
                time.sleep(10)
                conn = amqp.Connection(host="192.168.1.1:5670", userid="jasee", password="password", virtual_host="tao", insist=False)
                chan = conn.channel()
                print "Reconnected"
                chan.basic_publish(msg,exchange="test",routing_key="hello")
      
        chan.close()
        conn.close()
      
    • consumer.py

        #!/usr/bin/env python
        # -*- coding: utf-8 -*-
      
        from amqplib import client_0_8 as amqp
        import time
      
        conn = amqp.Connection(host="192.168.1.1:5670", userid="jasee", password="password", virtual_host="tao", insist=False)
        chan = conn.channel()
      
        chan.queue_declare(queue="sayHello", durable=True, exclusive=False, auto_delete=False)
        chan.exchange_declare(exchange="test", type="direct", durable=True, auto_delete=False)
        chan.queue_bind(queue="sayHello", exchange="test", routing_key="hello")
      
        def recv_callback(msg):
            print 'Received: %s' % msg.body
      
        chan.basic_consume(queue="sayHello", no_ack=True, callback=recv_callback,consumer_tag="testConsumer")
        while True:
            try:
                chan.wait()
            except KeyboardInterrupt:
                chan.basic_cancel("testConsumer")
                print "Consumer End"
                chan.close()
                conn.close()
                break
            except:
                try:
                    chan.close()
                    conn.close()
                except:
                    pass
                print "Reconnecting..."
                time.sleep(10)
                conn = amqp.Connection(host="192.168.1.1:5670", userid="jasee", password="password", virtual_host="tao", insist=False)
                chan = conn.channel()
                chan.queue_declare(queue="sayHello", durable=True, exclusive=False, auto_delete=False)
                chan.exchange_declare(exchange="test", type="direct", durable=True, auto_delete=False)
                chan.queue_bind(queue="sayHello", exchange="test", routing_key="hello")
                chan.basic_consume(queue="sayHello", no_ack=True, callback=recv_callback,consumer_tag="testConsumer")
                print "Reconnected"
      

使用Keepalived+LVS

做冗余当然就做完全的冗余,有了HAProxy但是HAProxy本身单点的话出了问题也白搭。之前想要使用Keepalived+HAProxy,但是在发现了怎么样让 LVS 和 realserver 工作在同一台机器上之后,还是决定继续使用LVS+Keepalived的完美组合。LVS比HAProxy简单、干净、纯粹,最重要的是目前使用的比较多。

  1. LVS+Keepalived安装步骤

     $ ln -s /usr/src/kernels/2.6.32-358.18.1.el6.x86_64 /usr/src/linux
     $ tar -zxf ipvsadm-1.24.tar.gz
     $ cd ipvsadm-1.24
     $ make && make install
     $ cd ..
     $ yum install openssl-devel openssl popt-devel
     $ tar -zxf keepalived-1.1.20.tar.gz
     $ cd keepalived-1.1.20
     $ ./configure --prefix=/ && make && make install
    
  2. 配置

    为了节省资源,我们将LVS和RabbitMQ部署在一起,需要在rabbitmq01和rabbitmq02上执行以下命令:

     # 这部分命令在两台服务器上都执行,其中192.168.1.100是VIP
     $ echo 1 > /proc/sys/net/ipv4/conf/lo/arp_ignore
     $ echo 2 > /proc/sys/net/ipv4/conf/lo/arp_announce
     $ echo 1 > /proc/sys/net/ipv4/conf/all/arp_ignore
     $ echo 2 > /proc/sys/net/ipv4/conf/all/arp_announce
     $ ifconfig lo:1 192.168.1.100 netmask 255.255.255.255
     # 此命令在rabbitmq01上执行
     $ iptables -t mangle -I PREROUTING -d 192.168.1.100 -p tcp -m tcp --dport 5672 -m mac ! --mac-source $mac_of_rabbitmq02 -j MARK --set-mark 0x1
     # 此命令在rabbitmq02上执行
     $ iptables -t mangle -I PREROUTING -d 192.168.1.100 -p tcp -m tcp --dport 5672 -m mac ! --mac-source $mac_of_rabbitmq01 -j MARK --set-mark 0x2
    

    以上命令可以写入/etc/rc.local。另外rabbitmq01、02上的Keepalived配置分别为:

    • rabbitmq01

        global_defs {
           router_id rabbitmq01
        }
      
        vrrp_instance rabbitmq {
            state MASTER
            interface eth0
            lvs_sync_daemon_inteface eth0
            virtual_router_id 1
            priority 150
            advert_int 5
            authentication {
                auth_type PASS
                auth_pass passwd
            }
            virtual_ipaddress {
                192.168.1.100
            }
        }
      
        virtual_server fwmark 1 {
                delay_loop 10
                lb_algo wrr
                lb_kind DR
                persistence_timeout 6000
                protocol TCP
      
                real_server  192.168.1.1 5672 {
                        inhibit_on_failure
                        weight 10
                        TCP_CHECK {
                                connect_timeout 3
                                nb_get_retry 3
                                delay_before_retry 3
                                connect_port 5672
                        }
                }
      
                real_server  192.168.1.2 5672 {
                        inhibit_on_failure
                        weight 10
                        TCP_CHECK {
                                connect_timeout 3
                                nb_get_retry 3
                                delay_before_retry 3
                                connect_port 5672
                        }
                }
        }
      
    • rabbitmq02

        global_defs {
           router_id rabbitmq02
        }
      
        vrrp_instance rabbitmq {
            state BACKUP
            interface eth0
            lvs_sync_daemon_inteface eth0
            virtual_router_id 1
            priority 100
            advert_int 5
            authentication {
                auth_type PASS
                auth_pass passwd
            }
            virtual_ipaddress {
                192.168.1.100
            }
        }
      
        virtual_server fwmark 2 {
                delay_loop 10
                lb_algo wrr
                lb_kind DR
                persistence_timeout 6000
                protocol TCP
      
                real_server  192.168.1.1 5672 {
                        inhibit_on_failure
                        weight 10
                        TCP_CHECK {
                                connect_timeout 3
                                nb_get_retry 3
                                delay_before_retry 3
                                connect_port 5672
                        }
                }
      
                real_server  192.168.1.2 5672 {
                        inhibit_on_failure
                        weight 10
                        TCP_CHECK {
                                connect_timeout 3
                                nb_get_retry 3
                                delay_before_retry 3
                                connect_port 5672
                        }
                }
        }
      
  3. 目前的结构

    此时启动两台服务器的Keepalived服务即可,此时测试环境的结构为:

                                            VIP
                                             |
                                 +-----------+--------------+
                                 |                          |
                     ............v...... HeartBeat .........v.........
                     .     Keepalived  .<--------->.     Keepalived  .
                     ..........+........           ...........+.......
                     .        LVS------------+---------------LVS     .
                     .                 .     |     .                 .
                     .         +-----------DR/wrr-------------+      .
                     .         |       .           .          |      .
                     ..........v........           ...........v.......
                     |     rabbitmq01  |           |     rabbitmq02  |
                     |  RabbitMQ       |<--------->|  RabbitMQ       |
                     |    Disk         |   Mirror  |    Ram          |
                     |    Stats        |<--------->|                 |
                     + ----------------+  Cluster  + ----------------+
    
  4. 一些说明

    1. 问题

      能够在RS上同时部署LVS主备的关键点就是前面的两条iptables语句。如果LVS不是按照数据包标签来进行负载均衡和流量转发,那么会碰到以下问题:

      1. 为了保证切换速度,Keepalived会在LVS备机上也启用LVS。
      2. 主LVS在接收到流量之后,有一半的概率将该数据包发往备LVS所在RS节点。
      3. 备LVS所在RS节点收到数据包后,首先经由ip_vs模块处理,又有一半的概率将数据包转发到主LVS所在的RS节点。
      4. 反复以上过程,产生很多无用流量和乱七八糟的转发。
    2. 解决方法

      通过将不是对面LVS发送的、到指定IP和端口的数据包打标签,然后在LVS里配置(见上述keepalived.conf)转发带有该标签的数据包,可以解决该问题,请求过程为:

      1. 客户端发送对VIP的RabbitMQ端口的访问,主LVS上的防火墙规则将该数据打上标签1。
      2. LVS的配置中配置对标签1进行均衡转发,一半的流量直接本机处理,一半的流量转发到备LVS所在RS节点。
      3. 备LVS所在RS节点的防火墙因发现来源MAC地址为主LVS,不会打标签2。
      4. 备LVS的针对标签2的均衡转发不会生效。
      5. 备LVS所在RS的RabbitMQ正常处理该请求。

按照以上设定,LVS主备和RS可以共存,访问RabbitMQ的流量也可以正常进行均衡。Keepalived将会负责心跳检测和故障时的VIP漂移,一台服务器宕机影响不大。

拓扑和其他

目前计划使用的拓扑如下。

                                                    VIP
                                                     |
                                         +-----------+--------------+
                                         |                          |
                             ............v...... HeartBeat .........v.........
                             .     Keepalived  .<--------->.     Keepalived  .
                             ..........+........           ...........+.......
                             .        LVS------------+---------------LVS     .
                             .                 .     |     .                 .
                             .         +-----------DR/wrr-------------+      .
                             .         |       .           .          |      .                  IP
 +-----------------+         ..........v........           ...........v.......         +--------+--------+
 |     rabbitmq01  |         |     rabbitmq02  |           |     rabbitmq03  |         |     rabbitmq04  |
 |  RabbitMQ       |         |  RabbitMQ       |<--------->|  RabbitMQ       |         |  RabbitMQ       |
 |    Disk         |         |    Ram          |   Mirror  |    Ram          |         |    Ram          | ...
 |    Stats        |<------->|                 |<--------->|                 |<------->|                 |
 +-----------------+ Cluster + ----------------+  Cluster  + ----------------+ Cluster + ----------------+

其中对安全性要求较高的消息走VIP,对速度要求较高的消息走集群剩余节点的节点IP。队列镜像需要主动设定,以下为参考命令:

# 对vhost tao中的所有队列启用镜像
$ rabbitmqctl set_policy -p tao ha-tao-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 取消刚才的设定
$ rabbitmqctl clear_policy -p tao ha-tao-all
# 对vhost tao中的test队列启用镜像
$ rabbitmqctl set_policy -p tao ha-tao-test "test" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 后面的设定在此处意义不大,为了性能考虑我们只镜像两个节点,基本不存在增加slave节点的操作。

另外,开启队列镜像模式之后对性能的影响还是挺大的。由于线下测试环境较差,数据波动较大,具体数值参考价值不大,只说一下大概。在使用1000字节的消息进行测试时,相比于未开启镜像:

  1. 写入速度下降到30%-40%。
  2. 读取速度下降到70%左右。
  3. 同时读写,速度下降约一半。

100Mb网络+台式机,每秒读写消息在k级别。如果不是因为两个节点中有一个磁盘节点,性能也许会好一些。如果镜像节点更多,性能还会进一步下降,未做测试。对安全性要求不高的队列,还是不要使用镜像模式比较好。

Loading Disqus comments...
Table of Contents