设计理念

RocketMQ实践篇之集成SpringBoot

生产者

生产者只需要一个即可发送不同种类的消息,故直接注入一个即可,使用时直接引入使用

消费者

不同的主题有不同的消费逻辑,所以只是创建一个工具类,使用时自行注入并实现listener

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<!-- lombok 简化代码(非必要) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
# 非必要,自行定义的配置,方便使用
# rocketmq
rocketmq:
namesrv: 192.168.3.30:9876
seckill:
topic: seckill_topic
groupname: seckill_group

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Component
@Slf4j
public class RocketMQProducer implements InitializingBean, DisposableBean {
@Value("${rocketmq.namesrv}")
private String namesrvAddr;
private DefaultMQProducer producer;
/**
* 应用停止时自动调用,停止MQ
* <br/>
* @author KThirty
* @since 2020/5/30 13:31
*/
@Override
public void destroy() throws Exception {
if(producer != null){
log.info("RocketMQ停止");
producer.shutdown();
}
}
/**
* 应用启动完成后自动调用,用来初始化并启动生产者
* <br/>
* @author KThirty
* @since 2020/5/30 13:32
*/
@Override
public void afterPropertiesSet() throws Exception {
producer = new DefaultMQProducer("defaultMQProducer");
producer.setInstanceName("mqInstance_" + 1);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.start();
log.info("RocketMQ生成者启动成功,namesrv地址为[{}]",namesrvAddr);
}

/**
* 发送消息
* @param topic 主题
* @param message 发送消息
*/
public SendResult send(String topic, String message) throws Exception{
log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message);
Message msg = new Message(topic, message.getBytes());
return producer.send(msg);
}
/**
* 发送消息(超时时间)
* @param topic 主题
* @param message 消息
* @param timeout 超时时间
*/
public SendResult send(String topic, String message,long timeout) throws Exception{
log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message);
Message msg = new Message(topic, message.getBytes());
return producer.send(msg,timeout);
}
/**
* 发送消息(回调)
* @param topic 主题
* @param message 消息
* @param sendCallback 回调信息
*/
public void send(String topic, String message, SendCallback sendCallback) throws Exception{
log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message);
Message msg = new Message(topic, message.getBytes());
producer.send(msg,sendCallback);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
@AllArgsConstructor
public class RocketMQConsumer implements InitializingBean, DisposableBean {
private MessageListenerConcurrently messageListener;
private String namesrvAddr;
private String topic;
private String groupName;
private DefaultMQPushConsumer consumer;
private MessageModel messageModel;

@Override
public void afterPropertiesSet() throws Exception {
Assert.hasText(namesrvAddr,"namesrvAddr地址不能为空");
Assert.hasText(topic,"topic不能为空");
Assert.notNull(messageListener,"messageListener不能为null");
try {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setInstanceName("mqConsumerInstance_" + 1);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, "*");
consumer.setMessageModel(messageModel);//消费模式
consumer.registerMessageListener(messageListener);
consumer.setVipChannelEnabled(false);
consumer.start();
log.info("RocketMQ消费者启动成功");
} catch (Exception e) {
log.error("RocketMQ消费者启动异常",e);
}
}

@Override
public void destroy() throws Exception {
if(this.consumer != null){
this.consumer.shutdown();
}
}
}

秒杀

配置Listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class SeckillMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
String body = new String(list.get(0).getBody());
log.info("秒杀消费消息:[{}]",body);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

注入消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class SeckillMQConfig {
@Value("${rocketmq.namesrv}")
private String namesrv;
@Value("${rocketmq.seckill.topic}")
private String seckillTopic;
@Value("${rocketmq.seckill.groupname}")
private String seckillGroupName;

/**
* 注入秒杀mq message listener
* <br/>
* @return org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
* @author KThirty
* @since 2020/5/30 14:42
*/
@Bean
public MessageListenerConcurrently getSeckillMessageListener(){
return new SeckillMessageListener();
}
/**
*创建并注入消费者
* 使用集群方式消费(因为集群时,如果多次消费反而无效,只消费一次即可)
* 这里备注一下消费方式
* 1. 集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
* 2. 广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
* <br/>
* @return top.kthirty.rocketmq.RocketMQConsumer
* @author KThirty
* @since 2020/5/30 14:45
*/
@Bean
public RocketMQConsumer getSeckillMQConsumer(){
return new RocketMQConsumer(getSeckillMessageListener(),namesrv,seckillTopic,seckillGroupName,null, MessageModel.CLUSTERING);
}
}

秒杀接口

用于测试mq发送与消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@RestController
@RequestMapping("/seckill")
public class SeckillController {
@Value("${rocketmq.seckill.topic}")
private String seckillTopic;
@Autowired
private RocketMQProducer rocketMQProducer;
// 同步发送
@GetMapping("sysn")
public String skillSysn(){
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Map<String,Object> param = new HashMap<>();
param.put("userId",i);
param.put("goodsId",i);
try {
rocketMQProducer.send(seckillTopic, JSON.toJSONString(param));
} catch (Exception e) {
e.printStackTrace();
return "消息发送失败";
}
}
System.out.println("同步发送耗时:"+(System.currentTimeMillis()-start));
return "success";
}
// 异步发送
@GetMapping("aysn")
public String skillAysn(){
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Map<String,Object> param = new HashMap<>();
param.put("userId",i);
param.put("goodsId",i);
try {
rocketMQProducer.send(seckillTopic, JSON.toJSONString(param), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}

@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
} catch (Exception e) {
e.printStackTrace();
return "消息发送失败";
}
}
System.out.println("异步发送耗时:"+(System.currentTimeMillis()-start));
return "success";
}
}

启动并测试

启动后访问 http://localhost:8080/seckill/sysn (同步发送) 或者 http://localhost:8080/seckill/aysn (异步发送)

可以看到类似日志

1
2
RocketMQ发送消息,主题seckill_topic,消息内容{"goodsId":0,"userId":0}
秒杀消费消息:[{"goodsId":0,"userId":0}]

遇到的问题

  1. No route info of this topic原因是没有对应的主题,解决方式有两种

    1. 允许自动创建主题(测试环境适用)
    2. 自动去console添加主题(生产环境适用)
  2. sendDefaultImpl call timeout 网络访问超时

    一般是因为docker 网络问题,解决方式可以修改broker.conf 中的brokerIP1=192.168.3.30项修改为自己的网络IP,不使用docker 的网卡IP

附录