浅谈RabbitMQ

本 wiki 基于 rabbitmq 3.7.16springboot 2.x

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

认识 RabbitMQ

MQ

消息队列(Message Queue) : 一种应用对应用的通讯方法,应用间通过读写队列中的消息就可以完成通信,不需直接通信。从存储消息的角度来看,消息队列也可以理解为消息容器。

MQ 的作用 : 异步削峰解耦

AMQP & JMS

MQ 实现方式 说明
AMQP 应用层协议,不受开发语言等限制。消息模型更加丰富。
JMS Java Message Service 实际指 JMS API ,只能使用 Java 语言。规定了两种消息模型。

RabbitMQ

RabbitMq 是开源的,基于 AMQP 的跨平台跨语言的消息队列。

常见 MQ 产品 说明
ActiveMQ 基于 JMS。
RabbitMQ 基于AMQP协议,基于 erlang 语言开发,稳定性好。
RocketMQ 基于JMS,阿里巴巴产品,目前交由Apache基金会。
Kafka 分布式消息系统,高吞吐量。

使用 RabbitMQ

常用消息模型

ConnectionUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package tk.gushizone.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

/**
* 建立与RabbitMQ的连接
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
// 通过工程获取连接
return factory.newConnection();
}

}

QueueConst.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tk.gushizone.rabbitmq.constant;

public interface QueueConst {

String SIMPLE_QUEUE = "simple_queue";

String WORK_QUEUE = "work_queue";

String FANOUT_QUEUE_1 = "fanout_queue_1";

String FANOUT_QUEUE_2 = "fanout_queue_2";

String DIRECT_QUEUE_1 = "direct_queue_1";

String DIRECT_QUEUE_2 = "direct_queue_2";

String TOPIC_QUEUE_1 = "topic_queue_1";

String TOPIC_QUEUE_2 = "topic_queue_2";

String SPRING_QUEUE = "spring_queue";
}

ExchangeConst.java

1
2
3
4
5
6
7
8
9
10
11
12
package tk.gushizone.rabbitmq.constant;

public interface ExchangeConst {

String FANOUT_EXCHANGE = "fanout_exchange";

String DIRECT_EXCHANGE = "direct_exchange";

String TOPIC_EXCHANGE = "topic_exchange";

String SPRING_EXCHANGE = "spring_exchange";
}

Simple

Simple消息模型 是最简单的生产消费模型。

Simple 消息模型

*Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package tk.gushizone.rabbitmq.sample.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Send {

public static void main(String[] argv) throws Exception {

// 获取连接并创建通道(使用通道才能完成消息相关的操作)
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明(创建)队列
channel.queueDeclare(QueueConst.SIMPLE_QUEUE, false, false, false, null);
// 消息内容
String msg = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QueueConst.SIMPLE_QUEUE, null, msg.getBytes());

log.warn("Producer sent : [{}]", msg);

//关闭通道和连接
channel.close();
connection.close();
}
}
1
Producer sent : [Hello World!]

*Consumer.java

ACK (Acknowledge character) : 确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

自动 ACK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package tk.gushizone.rabbitmq.sample.simple;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QueueConst.SIMPLE_QUEUE, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消息接收处理器(回调函数)
* @param body 消息体
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {

log.warn("Consumer received : [{}]", new String(body));
}
};
// 监听队列,自动ACK
channel.basicConsume(QueueConst.SIMPLE_QUEUE, true, consumer);
}
}
1
Consumer received : [Hello World!]

手动 ACK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package tk.gushizone.rabbitmq.sample.simple;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

@Slf4j
public class Recv2 {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();


channel.queueDeclare(QueueConst.SIMPLE_QUEUE, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);

log.warn("Consumer received : [{}]", new String(body));
}
};
// 监听队列,手动ACK
channel.basicConsume(QueueConst.SIMPLE_QUEUE, false, consumer);
}
}
1
Consumer received : [Hello World!]

Work

Work消息模型Simple 仅仅分别是单体和集群的区别,更强调负载均衡,能者多劳。

Work 消息模型

*Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package tk.gushizone.rabbitmq.sample.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Send {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QueueConst.WORK_QUEUE, false, false, false, null);

// 多次发布任务
for (int i = 0; i < 10; i++) {

String msg = "task .. " + i;
channel.basicPublish("", QueueConst.WORK_QUEUE, null, msg.getBytes());
log.warn("Producer sent [{}]", msg);

Thread.sleep(i * 2);
}

channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
Producer sent [task .. 0]
Producer sent [task .. 1]
Producer sent [task .. 2]
Producer sent [task .. 3]
Producer sent [task .. 4]
Producer sent [task .. 5]
Producer sent [task .. 6]
Producer sent [task .. 7]
Producer sent [task .. 8]
Producer sent [task .. 9]

Recv.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package tk.gushizone.rabbitmq.sample.work;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QueueConst.WORK_QUEUE, false, false, false, null);
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);

DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {

channel.basicAck(envelope.getDeliveryTag(), false);

log.warn("Consumer1 received : [{}]", new String(body));

// 模拟完成任务的耗时
Thread.sleep(100);
}
};

channel.basicConsume(QueueConst.WORK_QUEUE, false, consumer);
}
}
1
2
Consumer1 received : [task .. 0]
Consumer1 received : [task .. 2]

Recv2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package tk.gushizone.rabbitmq.sample.work;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

@Slf4j
public class Recv2 {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(QueueConst.WORK_QUEUE, false, false, false, null);
channel.basicQos(1);

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {

channel.basicAck(envelope.getDeliveryTag(), false);

log.warn("consumer2 received : [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.WORK_QUEUE, false, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
Producer sent [task .. 0]
Producer sent [task .. 1]
Producer sent [task .. 2]
Producer sent [task .. 3]
Producer sent [task .. 4]
Producer sent [task .. 5]
Producer sent [task .. 6]
Producer sent [task .. 7]
Producer sent [task .. 8]
Producer sent [task .. 9]

Fanout

Fanout消息模型 又称之为广播模型, produce 可以通过 交换机(exchange) 把消息发给多个队列和对应 consumer

Fanout 消息模型

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package tk.gushizone.rabbitmq.sample.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Send {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明exchange,指定类型为fanout
channel.exchangeDeclare(ExchangeConst.FANOUT_EXCHANGE, "fanout");

String msg = "Hello World";
// 发布消息到Exchange
channel.basicPublish(ExchangeConst.FANOUT_EXCHANGE, "", null, msg.getBytes());
log.warn("Producer sent [{}]", msg);

channel.close();
connection.close();
}
}
1
Producer sent [Hello World]

Recv.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package tk.gushizone.rabbitmq.sample.fanout;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QueueConst.FANOUT_QUEUE_1, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QueueConst.FANOUT_QUEUE_1, ExchangeConst.FANOUT_EXCHANGE, "");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {

log.warn("Consumer1 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.FANOUT_QUEUE_1, true, consumer);
}
}
1
Consumer1 received [Hello World]

Recv2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package tk.gushizone.rabbitmq.sample.fanout;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv2 {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QueueConst.FANOUT_QUEUE_2, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QueueConst.FANOUT_QUEUE_2, ExchangeConst.FANOUT_EXCHANGE, "");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {

log.warn("Consumer2 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.FANOUT_QUEUE_2, true, consumer);
}
}
1
Consumer2 received [Hello World]

Direct

Direct消息模型 又称之为 路由模型,其是 Fanout 的增强,通过 routingkey ,交换机可以有选择的将消息转发给对应的队列。

Direct 消息模型

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package tk.gushizone.rabbitmq.sample.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Send {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明exchange,指定类型为direct
channel.exchangeDeclare(ExchangeConst.DIRECT_EXCHANGE, "direct");
// 消息内容
String msg = "商品删除, id = 1001";
// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(ExchangeConst.DIRECT_EXCHANGE, "delete", null, msg.getBytes());
log.warn("Producer sent [{}]", msg);

channel.close();
connection.close();
}
}
1
Producer sent [商品删除, id = 1001]

Recv.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package tk.gushizone.rabbitmq.sample.direct;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv {

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QueueConst.DIRECT_QUEUE_1, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
channel.queueBind(QueueConst.DIRECT_QUEUE_1, ExchangeConst.DIRECT_EXCHANGE, "update");
channel.queueBind(QueueConst.DIRECT_QUEUE_1, ExchangeConst.DIRECT_EXCHANGE, "delete");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {
log.warn("Consumer1 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.DIRECT_QUEUE_1, true, consumer);
}
}
1
Consumer1 received [商品删除, id = 1001]

Recv2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package tk.gushizone.rabbitmq.sample.direct;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv2 {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QueueConst.DIRECT_QUEUE_2, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QueueConst.DIRECT_QUEUE_2, ExchangeConst.DIRECT_EXCHANGE, "insert");
channel.queueBind(QueueConst.DIRECT_QUEUE_2, ExchangeConst.DIRECT_EXCHANGE, "update");
channel.queueBind(QueueConst.DIRECT_QUEUE_2, ExchangeConst.DIRECT_EXCHANGE, "delete");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {
log.warn("Consumer2 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.DIRECT_QUEUE_2, true, consumer);
}
}
1
Consumer2 received [商品删除, id = 1001]

Topic

Topic消息模型Direct 的增强版, 让 routingkey 支持通配符。

  • * :一个词。
  • # :多个词。

Topic 消息模型

持久化

  • 队列 | 交换机持久化: durable
  • 消息持久化: MessageProperties.PERSISTENT_TEXT_PLAIN

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tk.gushizone.rabbitmq.sample.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Send {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明 exchange,指定类型为topic (durable: 是否持久化)
channel.exchangeDeclare(ExchangeConst.TOPIC_EXCHANGE, "topic", true);
// 消息内容
String msg = "新增商品 : id = 1001";
// 发送消息,并且指定routing key 为:insert ,代表新增商品 (消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN)
channel.basicPublish(ExchangeConst.TOPIC_EXCHANGE, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
log.warn("Producer sent [{}]", msg);

channel.close();
connection.close();
}
}
1
Producer sent [新增商品 : id = 1001]

Recv.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package tk.gushizone.rabbitmq.sample.topic;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列 (durable: 是否持久化)
channel.queueDeclare(QueueConst.TOPIC_QUEUE_1, true, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
channel.queueBind(QueueConst.TOPIC_QUEUE_1, ExchangeConst.TOPIC_EXCHANGE, "item.update");
channel.queueBind(QueueConst.TOPIC_QUEUE_1, ExchangeConst.TOPIC_EXCHANGE, "item.delete");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {
log.warn("Consumer1 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.TOPIC_QUEUE_1, true, consumer);
}
}
1
2


Recv2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package tk.gushizone.rabbitmq.sample.topic;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
public class Recv2 {

public static void main(String[] argv) throws Exception {

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列 (durable: 是否持久化)
channel.queueDeclare(QueueConst.TOPIC_QUEUE_2, true, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QueueConst.TOPIC_QUEUE_2, ExchangeConst.TOPIC_EXCHANGE, "item.*");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) {
log.warn("Consumer2 received [{}]", new String(body));
}
};

channel.basicConsume(QueueConst.TOPIC_QUEUE_2, true, consumer);
}
}
1
Consumer2 received [新增商品 : id = 1001]

Spring AMQP

spring amqp 是对AMQP协议的抽象实现,而 spring-rabbit 是对协议的具体实现,也是目前的唯一实现。

核心配置

pom.xml

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

application.yml

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
username: test
password: test
virtual-host: /test
port: 5672

@RabbitListener

@RabbitListener 说明
bingdings 指定绑定关系,可以有多个。值是@QueueBinding的数组。
@QueueBinding 说明
value 这个消费者关联的队列,使用 @Queue 指定队列。
exchange 队列所绑定的交换机,使用 @Exchange指定交换机。
key 即 RoutingKey。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package tk.gushizone.rabbitmq.spring;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.constant.QueueConst;

@Slf4j
@Component
public class Listener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QueueConst.SPRING_QUEUE, durable = "true"),
exchange = @Exchange(value = ExchangeConst.SPRING_EXCHANGE,
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC),
key = {"#.#"}))
public void listen(String msg) {

log.warn("接收到消息:[{}]", msg);
}
}
1
接收到消息:[hello, Spring boot amqp]

AmqpTemplate

AmqpTemplate 可以十分方便的发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tk.gushizone.rabbitmq.test.spring;

import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import tk.gushizone.rabbitmq.constant.ExchangeConst;
import tk.gushizone.rabbitmq.spring.RabbitMqApplication;
import tk.gushizone.rabbitmq.util.ConnectionUtil;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqApplication.class)
public class RabbitMqApplicationTest {

@Autowired
private AmqpTemplate amqpTemplate;

public void testSend() throws InterruptedException {

String msg = "hello, Spring boot amqp";
amqpTemplate.convertAndSend(ExchangeConst.SPRING_EXCHANGE, "a.b", msg);

}
}

附录

[Mac] 安装 RabbitMQ

仅介绍基于 hombrew 的安装方法。

安装并启动后,使用默认用户 guest/guest ,访问 http://localhost:15672/

1
2
3
4
5
# 安装 rabbitmq
$ brew install rabbitmq

# 启动,验证 http://localhost:15672/
$ brew services start rabbitmq

常见问题处理

常见问题 解决方式
消息丢失 1)手动 ACK 。2)持久化:消息 \ 队列 \ 交换机。
消息重复 1)业务幂等。2)使消息拥有唯一不变 key,redis 记录。
消息顺序 主动分配队列,一个队列一个消费者。