总结rabbitmq的安装方法,如何集成到springboot项目里面

安装

centos7安装

1.首先安装erlang

# 下载erlang源码
wget http://erlang.org/download/otp_src_21.1.tar.gz
tar -zxvf otp_src_21.1.tar.gz
cd otp_src_21.1
# erlang编译安装默认是装在/usr/local下的bin和lib中,这里我们将他统一装到/usr/local/erlang中,方便查找和使用
mkdir -p /usr/local/erlang

# 在编译之前,必须安装以下依赖包
yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel

# 配置编译安装
./configure --prefix=/usr/local/erlang
make && make install

# 将/usr/local/erlang/bin目录加入到环境变量中
vim /etc/profile
---------------------------------
PATH=$PATH:/usr/local/erlang/bin
---------------------------------
source /etc/profile

2.安装对应版本的rabbitmq,rabbitmq和erlang版本对应关系可以查看说明

# 下载解压到/usr/local/目录下
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.14/rabbitmq-server-generic-unix-3.7.14.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.14.tar.xz -C /usr/local/
# 将sbin目录加入到环境变量中方便执行
vim /etc/profile
-------------------------------------------------
PATH=$PATH:/usr/local/rabbitmq_server-3.7.14/sbin
-------------------------------------------------
source /etc/profile
# 启用web管理界面
rabbitmq-plugins enable rabbitmq_management

# 从github上下载配置文件
cd /usr/local/rabbitmq_server-3.7.14/etc/rabbitmq
wget https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/master/docs/rabbitmq.conf.example
mv rabbitmq.conf.example rabbitmq.conf
wget https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/master/docs/advanced.config.example
mv advanced.config.example advanced.config
# 在配置文件中将guest用户解除禁用
sed -i 's/^# loopback_users.guest = true$/loopback_users.guest = false/' rabbitmq.conf

# 常用命令
# 启动停止
# rabbitmq-server命令参考:https://www.rabbitmq.com/rabbitmq-server.8.html
rabbitmq-server start  # 前台启动
rabbitmq-server start_app  # 前台启动
rabbitmq-server -detached  # 后台启动
rabbitmq-server start &  # 后台启动
rabbitmqctl stop
rabbitmqctl stop_app
rabbitmqctl status
# 插件管理
rabbitmq-plugins list
rabbitmq-plugins enable XXX   # XXX为插件名
rabbitmq-plugins disable XXX
# 用户管理
rabbitmqctl add_user username password
rabbitmqctl delete_user username
rabbitmqctl change_password username newpassword
rabbitmqctl set_user_tags username tag  # 设置用户角色
rabbitmqctl list_users
# 权限管理
rabbitmqctl list_permissions
rabbitmqctl list_user_permissions username
rabbitmqctl clear_permissions [-p vhostpath] username
# 这个时候guest用户只有登录UI的权限,但是没有数据读写的权限,通过下面命令来赋予权限之后才能通过代码来操作数据
# rabbitmqctl set_permissions -p / guest ".*" ".*" ".*"
rabbitmqctl set_permissions [-p vhostpath] username conf write read
# conf: 一个正则匹配哪些资源能被该用户访问
# write:一个正则匹配哪些资源能被该用户写入
# read:一个正则匹配哪些资源能被该用户读取

rabbitmqctl reset application  # 重置

# 集群操作
rabbitmqctl cluster_status  # 集群状态
rabbitmqctl forget_cluster_node rabbit@rabbit3  # 节点摘除

可能会出现rabbitmqctl无法连接本机的情况,需要检查本机hostname

hostname
hostnamectl set-hostname <HOST_NAME>
vim /etc/hosts
------------------------------
127.0.0.1 <HOST_NAME>
------------------------------
ping <HOST_NAME>

docker方式安装

docker pull rabbitmq:3.7.16-management
docker run -d --hostname my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7.16-management
# 访问http://{host}:15672,默认username/pwd为guest/guest

集群搭建

rabbitmq有两种集群模式,普通集群和镜像集群。普通集群不能高可用,一般生产使用镜像集群实现高可用。镜像集群需要以普通集群作为基础。

集群中必须有一个磁盘节点,如果只有一个磁盘节点,而且这个磁盘节点挂了,集群依然可以运行,但是不能更改任何东西

  • 不能创建队列
  • 不能创建交换器
  • 不能创建绑定
  • 不能添加用户
  • 不能更改权限
  • 不能添加和删除集群几点

普通集群

准备2台centos7机器,按照第一个安装步骤全部安装好rabbitmq,在/etc/hosts中互相配置对方主机名称和ip地址,保证可以通过主机名称ping通对方,假设主机名称为rabbit1、rabbit2

vim /etc/hosts
-----------------------
192.168.237.128 rabbit1
192.168.237.129 rabbit2
-----------------------

以上centos7上安装rabbitmq是通过源码来安装的,在home目录下面会存在一个erlang的文件$home/.erlang.cookie,如果是通过yum来安装的话,这个文件应该在/var/lib/rabbitmq/.erlang.cookie,或者直接通过find -name .erlang.cookie命令来查找一下文件位置,集群节点是通过这个erlang cookie文件来进行通信的,这个文件的内容必须一致,这里将主机rabbit2上的.erlang.cookie内容手动修改成与rabbit1一致。

# 启动主机rabbit1上的rabbitmq-server
rabbitmq-server -detached
# 主机rabbit2上先不启动,执行加入集群操作,加入到rabbit1,加入之后再启动,如果有第三个主机rabbit3也想加入集群,操作一样
rabbitmq-server stop
rabbitmqctl join_cluster rabbit@rabbit1 --ram  # @之前的rabbit为固定单词,@之后是rabbit1的主机名称,RAM节点,默认Disk节点(集群中必须有一个是Disk节点)
rabbitmq-server -detached

# 查看集群状态
rabbitmqctl cluster_status

# 将节点移除
rabbitmqctl forget_cluster_node rabbit@rabbit1

成功加入集群之后随便访问哪个web ui,都可以看到集群中2个节点信息

http://192.168.237.128:15672
http://192.168.237.129:15672

镜像集群

在搭建完成一个普通集群的基础上执行:

# 命令说明:
# 在vhost名称为“my-vhosts” 上创建了一个策略,策略名称为my-policy,
# 策略正则表达式为 “^” 表示所有匹配所有队列名称,
# 策略模式为 all 即复制到所有节点,包含新增节点。
rabbitmqctl set_policy -p my-vhosts my-policy"^" '{"ha-mode":"all"}'

除去上面命令设置的方式以外,在web ui上admin>policy>Add / update a policy也可以添加

在web ui上Queues下面创建一个测试队列,在队列列的Node列,这个队列名称后面显示蓝色+1则代表该队列已同步到集群其它节点

查看镜像队列:

rabbitmqctl list_policies

删除镜像队列:

rabbitmqctl clear_policy

CAP原则、Base理论

CAP原则(CAP定理)、BASE理论

CAP原则

在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。

CAP.png

CAP三个特性只能满足其中两个,那么取舍的策略就共有三种:

CA without P:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。传统的关系型数据库RDBMS:Oracle、MySQL就是CA。

CP without A:如果不要求A(可用),相当于每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。设计成CP的系统其实不少,最典型的就是分布式数据库,如Redis、HBase等。对于这些分布式数据库来说,数据的一致性是最基本的要求,因为如果连这个标准都达不到,那么直接采用关系型数据库就好,没必要再浪费资源来部署分布式数据库。

AP wihtout C:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。典型的应用就如某米的抢购手机场景,可能前几秒你浏览商品的时候页面提示是有库存的,当你选择完商品准备下单的时候,系统提示你下单失败,商品已售完。这其实就是先在 A(可用性)方面保证系统可以正常的服务,然后在数据的一致性方面做了些牺牲,虽然多少会影响一些用户体验,但也不至于造成用户购物流程的严重阻塞。

Base

BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的简写,BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的结论,是基于CAP定理逐步演化而来的,其核心思想是即使无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。接下来我们着重对BASE中的三要素进行详细讲解。

概念

rabbitmq.png

rabbitmq.jpg

exchange

消息交换机,指定消息按什么规则,路由到哪个队列,有四种exchange:

  • direct(默认)

该类型路由规则会将消息路由到routing key与binding key完全匹配的Queue中,如果多个消费者监听同一个Queue,则Queue中的消息会被负载均衡到各个消费者上,而不是每个消费者都获取到队列中的每一条消息。

rabbitmq_exchange_direct.png

  • topic

与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配,如果多个消费者监听同一个Queue,则Queue中的消息会被负载均衡到各个消费者上,而不是每个消费者都获取到队列中的每一条消息。

rabbitmq_exchange_topic.png

  • fanout

该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能

rabbitmq_exchange_fanout.png

  • headers

该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配

queue

创建队列的时候可以指定一些arguments,在RMQ的管理平台手动创建队列时,可以用快捷键来创建这些arguments:

argument 说明
x-message-ttl message在队列中可以存活多长时间,以毫秒为单位;发布的消息在queue时间超过了你设定的时间就会被丢弃或转发到DLX(dead letter exchange)
x-expires 设置当前的queue在指定的时间内,没有consumer、basic.get也就是未被访问,就会被删除
x-max-length 设置队列最大长度,超过则被丢弃或转发到DLX(dead letter exchange)
x-max-length-bytes 设置队列消息最大字节长度,超过则被丢弃或转发到DLX(dead letter exchange)
x-overflow 设置队列溢出时的行为:
reject-publish拒绝接受所有消息
drop-head丢弃头部的消息
x-dead-letter-exchange 设置DLX名称,消息过期或者超过最大长度则被投递到死信交换机
x-dead-letter-routing-key 设置DLX的routing-key
x-max-priority 设置队列消息的最大优先级(0-255)
x-queue-mode 设置是否是lazy queue:
default
lazy
https://www.rabbitmq.com/lazy-queues.html
lazy queue的信息尽可能的都保存在磁盘上,仅在消费者请求的时候才会加载到RAM中。对于持久化的消息,重启之后消息依然在,但是对于非持久化的消息,虽然lazy queue也会将消息存储到磁盘,但是重启之后不会保留
x-queue-master-locator https://www.rabbitmq.com/ha.html#queue-master-location
设置如何定位主队列在集群中哪个节点上的策略
min-masters
client-local
random

队列创建的时候可以指定durable=1,标明可以持久化,重启之后队列依然存在。但是队列里面的消息是否还在取决于消息是否设置了持久化。

消息的持久化可以通过delivery_mode=2属性来设置,1非持久化,2持久化(默认持久化)。通过java代码设置的方式如下:

@Autowired
private AmqpTemplate amqpTemplate;

public void sendMessage(String msg) {
    this.amqpTemplate.convertAndSend("myExchange", "myRoutingKey", msg, message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
        return message;
    });
}

binding

作用是把exchange和queue按照路由规则绑定起来

Routing Key

路由关键字,exchange根据这个关键字进行消息投递

vhost

虚拟主机之间是相互隔离的,用户在创建的时候可以指定只能访问某个虚拟主机下面的资源(不指定则默认/),这些资源包括exchange,queue,不同用户可以通过vhost来做权限分离

# 添加虚拟主机
rabbitmqctl add_vhost VHOST_NAME
# 删除虚拟主机
rabbitmqctl delete_vhost VHOST_NAME
# 查看虚拟主机列表
rabbitmqctl list_vhosts
# 创建用户
rabbitmqctl add_user username password
# 分配角色(administrator/monitoring/management)
rabbitmqctl set_user_tags username administrator
# 设置权限(分别代表:配置权限、写权限、读权限)
rabbitmqctl set_permissions -p VHOST_NAME username ".*" ".*" ".*"
# 删除用户
rabbitmqctl delete_user username
# 修改密码
rabbimqctl change_password username newpassword

ack(acknowledge)机制、qos设置

ack

消息的消费者默认情况下是自动ack消息确认的,但是有些情况需要手动确认

对于springboot应用可以通过配置来设置ack模式,一共有三种模式可选:manual,auto,none

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

这样消费者使用完消息之后必须通过channel手动去确认消息已收到,如果没有确认,那rmq会继续把消息投递给该消费者

@RabbitListener(queues = "helloQueue")
public void process(String hello, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    System.out.println("Receiver1  : " + hello);
}

当然消费者也可以选择拒绝消息

//应答消息,ack返回true,第二个参数false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

//应答消息,ack返回false,最后一个参数reenqueue代表是否消息重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

//拒绝消息,最后一个参数reenqueue表示是否把消息重新放入到队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

//重新投递消息,跟上面3种方式不同的是,这个方法不会返回deliveryTag
channel.basicRecover();

qos

默认情况下消费者会自动应答,也就是acknowledge-mode=auto,并且当消费者连接上broker之后broker会把消息一次性全部投递给消费者。其实通过配置可以实现broker一次仅投递固定数量的消息给消费者,消费者应答之后再投递下一批数据。如果消费者没有应答,这批消息会还回到队列。

//通过消费者channel直接设置一次只拿一条数据
channel.basicQos(1);

//或者通过listener来配置
spring.rabbitmq.listener.direct.prefetch=1
spring.rabbitmq.listener.simple.prefetch=1

springboot项目中使用rabbitmq示例

示例代码

pom依赖:

<properties>
    <springboot.version>2.0.3.RELEASE</springboot.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>${springboot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

rabbitmq的基础配置代码,主要配置rabbitmq连接信息,注入RabbitTemplate对象

@Configuration
public class RabbitBasicConfig {
    private static Logger log = LoggerFactory.getLogger(RabbitBasicConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        /**
         * 若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true
         * 每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
         * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
         */
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * Mandatory是channel发布消息时的参数,主要用在当发布消息时,在不可达目的地时的消息如何处理,
         * mandatory默认是false,当exchange无法根据类型和routing找到符合要求的队列时,消息将直接被丢弃;
         * mandatory为true时,当出现找不到符合要求的队列时,消息将直接返回给发送者,
         * 配合ReturnListener使用,如果是spring集成amqp,则是ReturnCallback
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 如果消息没有到exchange,则confirm回调,ack=false
         * 如果消息到达exchange,则confirm回调,ack=true
         * exchange到queue成功,则不回调return
         * exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,消息就丢了)
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                } else {
                    log.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

    /**
     * 等同于在配置文件中配置:
     * spring.rabbitmq.host=192.168.255.138
     * spring.rabbitmq.port=5672
     * spring.rabbitmq.username=guest
     * spring.rabbitmq.password=guest
     * spring.rabbitmq.virtual-host=/
     */
    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.255.138");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
}

direct exchange示例

rabbitmq默认有一个exchange,类型是direct,如果发送消息时不声明一个新的exchange,则默认使用这个direct exchange,两种方式都可以。

首先声明测试需要用到的queue以及自定义的exchange和binding

@Configuration
public class DirectExchangeConfig {

    //使用默认的direct exchange的测试队列
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }
    @Bean
    public Queue userQueue() {
        return new Queue("userQueue");
    }

    //使用自定义的direct exchange和binding的测试队列
    @Bean
    public Queue dirQueue() {
        return new Queue("direct");
    }


    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    @Bean
    Binding bindingExchangeDirect(@Qualifier("dirQueue") Queue dirQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
    }
}

其实不直接通过@Bean声明queue,exchange,binding,而是在消费者@RabbitListener注解上通过参数来声明也是可以的,例如:

@RabbitListener(queuesToDeclare = @Queue("myQueue"))

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("myQueue"),
        exchange = @Exchange("myExchange"),
        key = "myRoutingKey"
))

创建一个producer生产字符串消息,两个consumer来消费字符串消息(rabbitmq仅支持同一个队列的一条消息分发给一个消费者,如果多个消费者同时监听一个队列,那队列消息会负载均衡分发到所有的消费者上)

/**
 * 使用默认的direct exchange发送消息
 */
@Component
public class HelloSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }
}


/**
 * 使用默认的direct exchange接收消息
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}


/**
 * 使用默认的direct exchange接收消息
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

上面是收发字符串消息,也可以收发java对象(需要实现Serializable)

public class User implements Serializable {
    private String name;
    private String pass;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", pass='" + pass + '\'' +
                '}';
    }
}


/**
 * 使用默认direct exchange发送对象
 */
@Component
public class UserSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(User user) {
        System.out.println("user send : " + user.getName()+"/"+user.getPass());
        this.rabbitTemplate.convertAndSend("userQueue", user);
    }
}


/**
 * 使用默认direct exchange接收对象
 */
@Component
@RabbitListener(queues = "userQueue")
public class UserReceiver {
    @RabbitHandler
    public void process(User user){
        System.out.println("user receive  : " + user.getName()+"/"+user.getPass());
    }
}

使用自定义的direct exchange来收发消息

/**
 * 使用自定义direct exchange来发送消息
 */
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        this.rabbitTemplate.convertAndSend("directQueue", "directSender: hello world");
    }
}


/**
 * 使用自定义direct exchange来接收消息
 */
@Component
@RabbitListener(queues = "directQueue")
public class DirectReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("directReceiver: " + msg);
    }
}

测试controller

/**
 * 测试使用默认direct exchange收发消息
 */
@RestController
public class DefaultDirectExchangeController {
    @Autowired
    HelloSender helloSender;

    @Autowired
    UserSender userSender;

    @RequestMapping("/hello")
    public void hello() { helloSender.send(); }

    @RequestMapping("/user")
    public void user(){
        User user = new User();
        user.setName("zhangsan");
        user.setPass("123");
        userSender.send(user);
    }
}


/**
 * 测试使用自定义direct exchange收发消息
 */
@RestController
public class CustomDirectExchangeController {
    @Autowired
    DirectSender directSender;

    @GetMapping("/testCustomDirect")
    public void send(){
        directSender.send();
    }
}

topic exchange示例

首先声明一个topic exchange,以及binding

@Configuration
public class TopicExchangeConfig {
    // Bean默认的name是方法名
    @Bean(name = "topicQueue1")
    public Queue topicQueue1() {
        return new Queue("topic.queue1");
    }

    @Bean(name = "topicQueue2")
    public Queue topicQueue2() {
        return new Queue("topic.queue2");
    }



    @Bean
    TopicExchange exchange() {
        // 参数1为交换机的名称
        return new TopicExchange("topicExchange");
    }



    /**
     * 将队列topic.queue1与topicExchange绑定,routing_key为topic.message,就是完全匹配
     */
    @Bean
    // 如果参数名和上面用到方法名称一样,可以不用写@Qualifier
    Binding bindingExchangeMessage(@Qualifier("topicQueue1") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.queue1");
    }
    /**
     * 将队列topic.queue2与topicExchange绑定,routing_key为topic.#,模糊匹配
     */
    @Bean
    Binding bindingExchangeMessages(@Qualifier("topicQueue2") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

创建消息生产者和消费者

/**
 * 使用topic exchange通过不同的routingKey将消息投送到不同的queue
 */
@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("topicExchange", "topic.queue1", "message: hello world,routing Key: topic.queue1");
        rabbitTemplate.convertAndSend("topicExchange", "topic.test", "message: hello world,routing Key: topic.test");
    }
}

/**
 * 接收topic exchange通过不同的routingKey投送过来的消息
 */
@Component
@RabbitListener(queues = "topic.queue1")
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("TopicReceiver1: " +msg);
    }
}

/**
 * 接收topic exchange通过不同的routingKey投送过来的消息
 */
@Component
@RabbitListener(queues = "topic.queue1")
public class TopicReceiver1_1 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("TopicReceiver1_1: " +msg);
    }
}

/**
 * 接收topic exchange通过不同的routingKey投送过来的消息
 */
@Component
@RabbitListener(queues = "topic.queue2")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("TopicReceiver2: " +msg);
    }
}

测试controller

@RestController
public class TopicExchangeController {

    @Autowired
    TopicSender topicSender;

    @RequestMapping("/testTopic")
    public void testTopic() { topicSender.send(); }
}

topic测试的示例图大致如下,其中有两个消费者同时监听队列queue1,queue1中的消息会负载均衡地投送给这两个消费者。

topic_test.jpg

fanout exchange示例

先定义三个queue,都绑定到同一个fanout exchange上,发送给fanout exchange的消息会被广播给绑定的所有队列

@Configuration
public class FanOutExchangeConfig {
    @Bean(name = "AMessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }



    /**
     * //配置广播路由器
     *
     * @return FanoutExchange
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 参数1为交换机的名称
        return new FanoutExchange("fanoutExchange");
    }



    @Bean
    Binding bindingExchangeA(@Qualifier("AMessage") Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

消息生产者和消费者

/**
 * 使用fanout exchange广播消息
 */
@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send() {
        // 参数2被忽略
        this.rabbitTemplate.convertAndSend("fanoutExchange","", "fanout message");
    }
}

/**
 * 使用fanout exchange接收广播消息
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }
}

/**
 * 使用fanout exchange接收广播消息
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }
}

/**
 * 使用fanout exchange接收广播消息
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }
}

测试controller

@RestController
public class FanoutExchangeController {

    @Autowired
    FanoutSender fanoutSender;

    @RequestMapping("/testFanout")
    public void testFanout() { fanoutSender.send(); }
}

消息延时

典型应用场景:下完订单后10分钟未支付则取消订单。

示例代码

1. 使用rabbitmq官方提供的延时消息插件

RabbitMQ Delayed Message Plugin插件的github说明中了解到这个插件现在官方还没有正式声明可以用于生产环境。

下载页面下载插件zip包,解压后的.ez文件放置到/usr/local/rabbitmq_server-3.7.14/plugins目录下面。

启用插件,不需要重新启动rabbitmq-server

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

springboot项目中实现延时消息

首先声明延时exchange,queue,binding:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

/**
 * 使用rabbitmq-delayed-message-exchange插件实现延迟消息
 * https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
 */
@Configuration
public class DelayedExchangeConfig {
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue("test_delay_queue", true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with("test_delay_queue").noargs();
    }
}

创建消息的生产者和消费者:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedMessageSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 通过MessagePostProcessor给消息添加一个超时时间,这个消息发送给delayed exchange之后到期后会被投递到对应的队列
     */
    public void sendMsg() {

        System.out.println("消息发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

        rabbitTemplate.convertAndSend("test_exchange", "test_delay_queue", "hello world", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay",3000);
                return message;
            }
        });
    }
}



import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedMessageReceiver {

    @RabbitListener(queues = "test_delay_queue")
    public void receive(String msg) {

        System.out.println("消息接收时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

        System.out.println("接收到的消息:"+msg);
    }
}

测试controller:

@RestController
public class DelayedExchangeController {
    @Autowired
    DelayedMessageSender delayedMessageSender;

    @GetMapping("/testDelayedExchange")
    public void testDelayedExchange(){
        delayedMessageSender.sendMsg();
    }
}

可以观察到消息发送到exchange之后,ConfirmCallbackReturnCallback这两个回调方法都被调用了,ReturnCallback方法一般情况下在消息被exchange投递到queue投递失败的情况下会被调用,但是在这里针对delayed exchange消息投递之前也会调用一次,告诉生产者消息已经被收到并持久化,过超时时间之后才会被投递到对应消息队列。

2. 使用DLX、TTL实现消息延时

rabbitmq允许对一个queue设置超时时间属性x-expires,同时也允许对一个message设置超时时间属性x-message-ttl,如果同时设置了,以时间短的为准。当消息超时之后,如果队列设置了x-dead-letter-exchangex-dead-letter-routing-key属性,那么消息会被重新投递到这2个属性指定的exchange(DLX,dead-letter-exchange,这个exchange就是一个普通的exchange),这样就间接实现了消息延时处理功能。

springboot项目中实现DLX

测试DLX(dead letter exchange)的示意图:

rabbitmq_dlx.jpg

首先声明需要延迟处理的exchange,queue,binding,以及实际消息最终投递的目的地exchange,queue,binding:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DeadLetterExchangeConfig {
    /**
     * 延迟队列 TTL 名称
     */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
     * DLX,dead letter发送到的 exchange
     * 延时消息就是发送到该交换机的
     */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
     * routing key 名称
     * 具体消息发送在该 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";

    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";

    /**
     * 延迟队列配置
     * <p>
     * 1、params.put("x-message-ttl", 5 * 1000);
     * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
     **/
    @Bean
    public Queue delayOrderQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
        return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    /**
     * DLX,dead letter发送到的 exchange
     * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
     * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
     * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }









    @Bean
    public Queue orderQueue() {
        return new Queue(ORDER_QUEUE_NAME, true);
    }
    /**
     * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
     * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
     **/
    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange(ORDER_EXCHANGE_NAME);
    }

    @Bean
    public Binding orderBinding() {
        // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
        return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
}

创建消息的生产者和消费者:

import com.qigang.rabbit.sb.config.DeadLetterExchangeConfig;
import com.qigang.rabbit.sb.dto.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DeadLetterSender {

    Logger log = LoggerFactory.getLogger(DeadLetterReceiver.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendDelay(Order order) {
        log.info("【订单发送】时间 :{},内容:{}", new Date(), order.toString());
        this.amqpTemplate.convertAndSend(DeadLetterExchangeConfig.ORDER_DELAY_EXCHANGE, DeadLetterExchangeConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
            // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
            message.getMessageProperties().setExpiration(5000 + "");
            return message;
        });
    }
}


import com.qigang.rabbit.sb.config.DeadLetterExchangeConfig;
import com.qigang.rabbit.sb.dto.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DeadLetterReceiver {

    Logger log = LoggerFactory.getLogger(DeadLetterReceiver.class);

    @RabbitListener(queues = {DeadLetterExchangeConfig.ORDER_QUEUE_NAME})
    public void orderDelayQueue(Order order) {
        log.info("【订单接收】时间:{},内容:{}", new Date(), order.toString());
    }
}

测试controller:

@RestController
public class DeadLetterExchangeController {
    @Autowired
    DeadLetterSender deadLetterSender;

    @GetMapping("/testDeadLetterExchange")
    public void testDeadLetterExchange(){
        Order order = new Order();
        order.setOrderId("123456");
        order.setOrderName("order-123456");
        order.setOrderStatus(0);
        deadLetterSender.sendDelay(order);
    }
}

消息优先级

创建queue的时候可以指定队列最大优先级的数值x-max-priority,通过代码或者直接在RMQ的管理平台创建队列的时候指定x-max-priority这个argument,数值在0-255之间。通过代码的方式声明队列优先级的方式如下:

@Bean
public Queue helloQueue() {
    Map<String, Object> params = new HashMap<>();
    params.put("x-max-priority", 10);
    return new Queue("helloQueue",true,false,false,params);
}

对于message,默认优先级为0,通过代码方式设置消息优先级的方式如下:

@Autowired
private AmqpTemplate amqpTemplate;

public void sendDelay(String msg) {
    this.amqpTemplate.convertAndSend("myExchange", "myRoutingKey", msg, message -> {
        message.getMessageProperties().setPriority(50);
        return message;
    });
}

注意:

  • 只有队列设置了最大优先级,消息的优先级才生效,消息的优先级如果大于队列最大优先级数值则按照队列最大优先级数值来
  • 在队列没有阻塞的情况下优先级没有任何意义,测试的时候可以先发送带优先级的消息到队列,消费者延迟一段时间再启动,这样才能看出来消费消息的时候是按照优先级高的先消费

消息可靠性

通过消息的可靠发送以及可靠消费,可以实现分布式事务的功能

可靠发送

生产者事务(不推荐)

事务的实现主要是对producer端的信道(Channel)的设置,主要的方法有三个:

  • channel.txSelect()声明启动事务模式;
  • channel.txComment()提交事务;
  • channel.txRollback()回滚事务;

事务模式的性能非常差,producer需要等待broker阻塞并且成功将消息持久化之后才算发布消息成功

生产者confirm机制(推荐)

  • channel.waitForConfirms()普通发送方确认模式;

    // 创建连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(config.UserName);
    factory.setPassword(config.Password);
    factory.setVirtualHost(config.VHost);
    factory.setHost(config.Host);
    factory.setPort(config.Port);
    Connection conn = factory.newConnection();
    // 创建信道
    Channel channel = conn.createChannel();
    // 声明队列
    channel.queueDeclare(config.QueueName, false, false, false, null);
    // 开启发送方确认模式
    channel.confirmSelect();
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    if (channel.waitForConfirms()) {
        System.out.println("消息发送成功" );
    }
    
  • channel.waitForConfirmsOrDie()批量确认模式;

    // 创建连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(config.UserName);
    factory.setPassword(config.Password);
    factory.setVirtualHost(config.VHost);
    factory.setHost(config.Host);
    factory.setPort(config.Port);
    Connection conn = factory.newConnection();
    // 创建信道
    Channel channel = conn.createChannel();
    // 声明队列
    channel.queueDeclare(config.QueueName, false, false, false, null);
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
    System.out.println("全部执行完成");
    
  • channel.addConfirmListener() 异步监听发送方确认模式;

    // 创建连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(config.UserName);
    factory.setPassword(config.Password);
    factory.setVirtualHost(config.VHost);
    factory.setHost(config.Host);
    factory.setPort(config.Port);
    Connection conn = factory.newConnection();
    // 创建信道
    Channel channel = conn.createChannel();
    // 声明队列
    channel.queueDeclare(config.QueueName, false, false, false, null);
    // 开启发送方确认模式
    channel.confirmSelect();
    for (int i = 0; i < 10; i++) {
        String message = String.format("时间 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
    }
    //异步监听确认和未确认的消息
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("未确认消息,标识:" + deliveryTag);
        }
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
        }
    });
    

可靠消费

ack机制,设置手动ack机制,消息处理完毕之后再确认

消息幂等

为了防止消费者重复消费消息,在消费端使用MessageID来进行幂等判断,或者判断业务数据状态或者ID

实现RPC调用

Remote procedure call (RPC)

远程过程调用RPC

问题

如何保证消息顺序

  1. 单个producer,单个queue,单个consumer

  2. consumer接收到消息处理完成之后给producer应用发条消息

消息积压的处理办法

如果消费者太少,或者消费者使用了手动ack机制并且处理速度慢,可能会造成消费的速度远远小于生产的速度,造成消息积压,broker内存打满进入保护状态。

  1. 设置并发消费的两个属性:concurrentConsumersprefetchCount

  2. consumer扩容

  3. 对于不重要的消息写脚本清理

    #!/bin/bash
    QUE=`rabbitmqctl list_queues  messages_ready name durable|grep -v "^Listing" |grep -v "^Timeout"`
    echo "$QUE" | while read line
    do
    ready=`echo "$line" | awk -F' ' '{print $1}'`
    name=`echo "$line" | awk -F' ' '{print $2}'`
    durable=`echo "$line" | awk -F' ' '{print $3}'`
    if (( "$ready" >= 1000 )) && (( "$durable" == true ));then
          echo "--------------"
          rabbitmqadmin -q delete queue name="$name"  && echo "删除成功!"
    fi
    done
    

参考

Downloading and Installing RabbitMQ

Centos7 安装rabbitmq详细教程

RabbitMQ笔记

rabbitmq——prefetch count

springboot整合rabbitmq

SpringBoot使用RabbitMQ延时队列

Spring Boot RabbitMQ 延时消息实现

RabbitMQ之消息持久化

消息确认Ack