设计理念
RocketMQ实践篇之集成SpringBoot
生产者
生产者只需要一个即可发送不同种类的消息,故直接注入一个即可,使用时直接引入使用
消费者
不同的主题有不同的消费逻辑,所以只是创建一个工具类,使用时自行注入并实现listener
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7
|
rocketmq: namesrv: 192.168.3.30:9876 seckill: topic: seckill_topic groupname: seckill_group
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Component @Slf4j public class RocketMQProducer implements InitializingBean, DisposableBean { @Value("${rocketmq.namesrv}") private String namesrvAddr; private DefaultMQProducer producer;
@Override public void destroy() throws Exception { if(producer != null){ log.info("RocketMQ停止"); producer.shutdown(); } }
@Override public void afterPropertiesSet() throws Exception { producer = new DefaultMQProducer("defaultMQProducer"); producer.setInstanceName("mqInstance_" + 1); producer.setNamesrvAddr(namesrvAddr); producer.setVipChannelEnabled(false); producer.start(); log.info("RocketMQ生成者启动成功,namesrv地址为[{}]",namesrvAddr); }
public SendResult send(String topic, String message) throws Exception{ log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message); Message msg = new Message(topic, message.getBytes()); return producer.send(msg); }
public SendResult send(String topic, String message,long timeout) throws Exception{ log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message); Message msg = new Message(topic, message.getBytes()); return producer.send(msg,timeout); }
public void send(String topic, String message, SendCallback sendCallback) throws Exception{ log.info("RocketMQ发送消息,主题{},消息内容{}", topic, message); Message msg = new Message(topic, message.getBytes()); producer.send(msg,sendCallback); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Slf4j @AllArgsConstructor public class RocketMQConsumer implements InitializingBean, DisposableBean { private MessageListenerConcurrently messageListener; private String namesrvAddr; private String topic; private String groupName; private DefaultMQPushConsumer consumer; private MessageModel messageModel;
@Override public void afterPropertiesSet() throws Exception { Assert.hasText(namesrvAddr,"namesrvAddr地址不能为空"); Assert.hasText(topic,"topic不能为空"); Assert.notNull(messageListener,"messageListener不能为null"); try { consumer = new DefaultMQPushConsumer(groupName); consumer.setInstanceName("mqConsumerInstance_" + 1); consumer.setNamesrvAddr(namesrvAddr); consumer.subscribe(topic, "*"); consumer.setMessageModel(messageModel); consumer.registerMessageListener(messageListener); consumer.setVipChannelEnabled(false); consumer.start(); log.info("RocketMQ消费者启动成功"); } catch (Exception e) { log.error("RocketMQ消费者启动异常",e); } }
@Override public void destroy() throws Exception { if(this.consumer != null){ this.consumer.shutdown(); } } }
|
秒杀
配置Listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Slf4j public class SeckillMessageListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { String body = new String(list.get(0).getBody()); log.info("秒杀消费消息:[{}]",body); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
|
注入消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Configuration public class SeckillMQConfig { @Value("${rocketmq.namesrv}") private String namesrv; @Value("${rocketmq.seckill.topic}") private String seckillTopic; @Value("${rocketmq.seckill.groupname}") private String seckillGroupName;
@Bean public MessageListenerConcurrently getSeckillMessageListener(){ return new SeckillMessageListener(); }
@Bean public RocketMQConsumer getSeckillMQConsumer(){ return new RocketMQConsumer(getSeckillMessageListener(),namesrv,seckillTopic,seckillGroupName,null, MessageModel.CLUSTERING); } }
|
秒杀接口
用于测试mq发送与消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| @RestController @RequestMapping("/seckill") public class SeckillController { @Value("${rocketmq.seckill.topic}") private String seckillTopic; @Autowired private RocketMQProducer rocketMQProducer; @GetMapping("sysn") public String skillSysn(){ long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Map<String,Object> param = new HashMap<>(); param.put("userId",i); param.put("goodsId",i); try { rocketMQProducer.send(seckillTopic, JSON.toJSONString(param)); } catch (Exception e) { e.printStackTrace(); return "消息发送失败"; } } System.out.println("同步发送耗时:"+(System.currentTimeMillis()-start)); return "success"; } @GetMapping("aysn") public String skillAysn(){ long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Map<String,Object> param = new HashMap<>(); param.put("userId",i); param.put("goodsId",i); try { rocketMQProducer.send(seckillTopic, JSON.toJSONString(param), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); }
@Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); } catch (Exception e) { e.printStackTrace(); return "消息发送失败"; } } System.out.println("异步发送耗时:"+(System.currentTimeMillis()-start)); return "success"; } }
|
启动并测试
启动后访问 http://localhost:8080/seckill/sysn
(同步发送) 或者 http://localhost:8080/seckill/aysn
(异步发送)
可以看到类似日志
1 2
| RocketMQ发送消息,主题seckill_topic,消息内容{"goodsId":0,"userId":0} 秒杀消费消息:[{"goodsId":0,"userId":0}]
|
遇到的问题
-
No route info of this topic原因是没有对应的主题,解决方式有两种
- 允许自动创建主题(测试环境适用)
- 自动去console添加主题(生产环境适用)
-
sendDefaultImpl call timeout 网络访问超时
一般是因为docker 网络问题,解决方式可以修改broker.conf
中的brokerIP1=192.168.3.30
项修改为自己的网络IP,不使用docker 的网卡IP
附录