整合RocketMQ
整合RocketMQ
添加依赖
xml文件
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
rocketmq配置
编写配置文件 config/rocketmq.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24# 指定namesrv地址
rocketmq.namesrvAddr=192.168.56.101:9876
#生产者group名称
rocketmq.producerGroupName=user_group
#事务生产者group名称
rocketmq.transactionProducerGroupName=order_transaction
#消费者group名称
rocketmq.consumerGroupName=user_consumer_group
#生产者实例名称
rocketmq.producerInstanceName=user_producer_instance
#消费者实例名称
rocketmq.consumerInstanceName=user_consumer_instance
#事务生产者实例名称
rocketmq.producerTranInstanceName=user_producer_transacition
#一次最大消费多少数量消息
rocketmq.consumerBatchMaxSize=1
#广播消费
rocketmq.consumerBroadcasting=false
#消费的topic:tag
rocketmq.subscribe[0]=topic_0
#启动的时候是否消费历史记录
rocketmq.enableHistoryConsumer=false
#启动顺序消费
rocketmq.enableOrderConsumer=falseyaml文件
1
2rocketmq:
name-server: 192.168.56.101:9876读取配置文件到 JavaBean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129package tk.fulsun.demo.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* @author fulsun
* @description: 读取配置文件信息
* @date 6/2/2021 5:23 PM
*/
public class RocketMQProperties {
private String namesrvAddr;
private String producerGroupName;
private String transactionProducerGroupName;
private String consumerGroupName;
private String producerInstanceName;
private String consumerInstanceName;
private String producerTranInstanceName;
private int consumerBatchMaxSize;
private boolean consumerBroadcasting;
private boolean enableHistoryConsumer;
private boolean enableOrderConsumer;
private List<String> subscribe = new ArrayList<String>();
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getProducerGroupName() {
return producerGroupName;
}
public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
}
public String getTransactionProducerGroupName() {
return transactionProducerGroupName;
}
public void setTransactionProducerGroupName(String transactionProducerGroupName) {
this.transactionProducerGroupName = transactionProducerGroupName;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public String getProducerInstanceName() {
return producerInstanceName;
}
public void setProducerInstanceName(String producerInstanceName) {
this.producerInstanceName = producerInstanceName;
}
public String getConsumerInstanceName() {
return consumerInstanceName;
}
public void setConsumerInstanceName(String consumerInstanceName) {
this.consumerInstanceName = consumerInstanceName;
}
public String getProducerTranInstanceName() {
return producerTranInstanceName;
}
public void setProducerTranInstanceName(String producerTranInstanceName) {
this.producerTranInstanceName = producerTranInstanceName;
}
public int getConsumerBatchMaxSize() {
return consumerBatchMaxSize;
}
public void setConsumerBatchMaxSize(int consumerBatchMaxSize) {
this.consumerBatchMaxSize = consumerBatchMaxSize;
}
public boolean isConsumerBroadcasting() {
return consumerBroadcasting;
}
public void setConsumerBroadcasting(boolean consumerBroadcasting) {
this.consumerBroadcasting = consumerBroadcasting;
}
public boolean isEnableHistoryConsumer() {
return enableHistoryConsumer;
}
public void setEnableHistoryConsumer(boolean enableHistoryConsumer) {
this.enableHistoryConsumer = enableHistoryConsumer;
}
public boolean isEnableOrderConsumer() {
return enableOrderConsumer;
}
public void setEnableOrderConsumer(boolean enableOrderConsumer) {
this.enableOrderConsumer = enableOrderConsumer;
}
public List<String> getSubscribe() {
return subscribe;
}
public void setSubscribe(List<String> subscribe) {
this.subscribe = subscribe;
}
}
生产者服务
注入生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52package tk.fulsun.demo.manager;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tk.fulsun.demo.config.RocketMQConfig;
import tk.fulsun.demo.config.RocketMQProperties;
/**
* TODO
* <pre>
*
* <pre>
* @author fulsun
* @date 2021/6/76:28
*/
public class Producer {
private RocketMQProperties rocketMQProperties;
private String producerGroup = "producer_group";
private DefaultMQProducer producer;
public Producer() {
producer = new DefaultMQProducer(producerGroup);
//指定nameserver地址,多个地址使用;分隔
producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
start();
}
public DefaultMQProducer getProducer() {
return this.producer;
}
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown() {
this.producer.shutdown();
}
}测试类
1
2
3
4
5
6
7
8
public void sendMsg() throws Exception {
DefaultMQProducer producer = this.producer.getProducer();
byte[] body = "hello 1".getBytes(RemotingHelper.DEFAULT_CHARSET);
Message message = new Message(rojectMQProperties.getSubscribe().get(0), "tag1", body);
SendResult send = producer.send(message);
System.out.println(send);
}
消息消费者服务
同生产者的依赖和配置文件
消息监听者:注解方式
1
2
3
4
5
6
7
8
9
10
public class Consumer implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("Receive message:"+message);
}
}代码方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Consumer {
DefaultMQPushConsumer defaultMQPushConsumer;
private RocketMQProperties rocketMQProperties;
// pull 拉取消息 push 推送消息 本质 拉取模式 好处 主动的拉取消息避免消费者的堆积
public void init() throws MQClientException {
defaultMQPushConsumer = new DefaultMQPushConsumer(rocketMQProperties.getConsumerGroupName());
// 订阅主题和 标签( * 代表所有标签)下信息
defaultMQPushConsumer.subscribe(rocketMQProperties.getSubscribe().get(0), "*");
// 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
defaultMQPushConsumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
// 注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
defaultMQPushConsumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
int size = msgs.size();
System.out.println("打印的数据是多少" + size);
for (MessageExt msg : msgs) {
try {
// String topic = msg.getTopic();
// String tags = msg.getTags();
// String keys = msg.getKeys();
// String body = new String(msg.getBody(), "utf-8");
// 再次修改数据库转态
System.out.println(
"consumeThread="
+ Thread.currentThread().getName()
+ "queueId="
+ msg.getQueueId()
+ ", content:"
+ new String(msg.getBody(), "utf-8"));
} catch (Exception e) {
e.printStackTrace();
// ACK应答:再次下发消息 消息失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// ACK应答:消费成功 删除消息 commitlog日志 进度 删除
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
System.out.println("消费者启动了~~~");
}
}
发送普通消息
生产者
1 | package com.example.rocketmqdemo; |
消费者
1 | package com.example.rocketmqdemo.jms; |
发送顺序消息
场景分析
顺序消费是指消息的产生顺序和消费顺序相同
有很多场景需要顺序消息,比如先买票再上车;京东买东西时,先下订单->付款->发货;等等
局部顺序消费
那么在RocketMQ里局部顺序消息又是如何怎么实现的呢?
要保证消息的顺序消费,有三个关键点
- 消息顺序发送
- 消息顺序存储
- 消息顺序消费
发送同一个业务编号【一个订单对应多个运单】【发送到同一个队列】【队列只能被一个消费者消费】 效率就会变低
全局顺序消费
- 所有发到mq的消息都被顺序消费,类似数据库中的binlog,需要严格保证全局操作的顺序性
- 那么RocketMQ中如何做才能保证全局顺序消费呢?
- 这就需要设置topic下读写队列数量为1, 性能差,线上不用
有序消息
只用一个broker,或者将具有同一标识的信息发送到指定的broker
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82private Producer producer;
private RocketMQProperties rocketMQProperties;
public void orderMsg()
throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String[] tags = new String[] {"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg =
new Message(
rocketMQProperties.getSubscribe().get(0),
tags[i % tags.length],
"KEY" + i,
body.getBytes());
SendResult sendResult =
producer
.getProducer()
.send(
msg,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // 根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
},
orderList.get(i).getOrderId()); // 订单id
System.out.println(
String.format(
"SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body));
}
}
//顺序消费者
Consumer() throws MQClientException {
defaultMQPushConsumer= new DefaultMQPushConsumer(consumerGroup);
defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"*");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
defaultMQPushConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
//决定了 List<MessageExt> list 数量,
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
int size = list.size();
System.out.println("打印的数据是多少"+size);
for (int i=0;i<list.size();i++){
try {
MessageExt messageExt = list.get(i);
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
TimeUnit.SECONDS.sleep(2);
String body = new String(messageExt.getBody(),"utf-8");
System.out.println(Thread.currentThread().getName()+""+"主题:"+topic+"二级标签:"+tags+"业务唯一:"+keys+"内部"+body);
} catch (Exception e) {
e.printStackTrace();
//ACK应答:再次下发消息 消息失败
return ConsumeOrderlyStatus.ROLLBACK;
}
}
//ACK应答:消费成功 删除消息 commitlog日志 进度 删除
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
System.out.println("消费者启动了~~~");
}
Tag标签-过滤消息
比如sql 过滤:工作做用处不多,面试也很少问
1
2
3defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"*");通配符
defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"jay"); 精确匹配
defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"jay || ikun"); 多个匹配
批量消息投递
批量发送消息可提高传递小消息的性能。同时也需要满足以下特征
批量消息要求必要具有同一
topic
、相同消息配置不支持延时消息
建议一个批量消息最好不要超过1MB大小
这一批消息的总大小不应超过4MB。
场景
- 批量发送营销业务,
- 发送推送
案例
1 | a |
发送事务消息
什么是事务消息
- @Transactional注解控制本地事务
- 通过消息来影响事务在服务端提交或是删除。全不成功,要不全部失败。保证消息发送之前本地事务执行成功
- RocketMq 4.3版本中开源了事务消息
- 最终一致性:临时中间状态 最终数据库修改为最终状态值 staus=支付中 status=支付成功
- 事务消息:通过【控制事务的提交或者回滚】
能解决什么问题?
- 分布式事务问题,分布式情况下最终最终一致性问题。
- 非原子性操作
示例
1 | package com.example.rocketmqdemo; |
事务消息原理
两个核心概念:两阶段提交、事务状态定时回查
COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息;
ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法
Half 半消息 broker 消费者看不到 topic 不是消费者订阅的。如果能收到就会给生产者发送一个 回复消息 half
执行:executeLocalTransaction 执行本地方法 数据库的增删改 ;根据状态 返回值
1
2
3
4// COMMIT_MESSAGE 提交成功 消费者就能收到消息
// LocalTransactionState.ROLLBACK_MESSAGE; 丢弃消息 告诉broker删除half消息 所有的消息都干掉
// LocalTransactionState.UNKNOW; 中间状态 不丢消息,也不成功 会查
// 网络中断收不到相应就会执行会查方法checkLocalTransaction1
2executeLocalTransaction 执行本地方法,提交本地事务
checkLocalTransaction broker调用会回查,返回值也决定了是否再次会查,如果返回commit也意味着本地事务执行成功 每隔一段时间会查,超过一定次数就认为失败1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void testTransaction() throws MQClientException {
TransactionMQProducer transactionMQProducer=new TransactionMQProducer("pay_group");
transactionMQProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
transactionMQProducer.setTransactionListener(new TransactionListener() {
//执行本地事务
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//业务代码的地方 [增删改查,状态 ,修改数据的状态 staus=1 支付中] 用户注册
//insert
System.out.println("执行本地方法");
//返回值决定了消费者能不能收到消息
// COMMIT_MESSAGE 提交成功 消费者就能收到消息
// LocalTransactionState.ROLLBACK_MESSAGE; 丢弃消息
// LocalTransactionState.UNKNOW; 中间状态 不丢消息,也不成功 会查
//status=1 8点整
return LocalTransactionState.UNKNOW;
}
//消息回查
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
/* String keys = messageExt.getKeys();
int count= select.byKesy(keys);*/
//8点5分 会查 查看数据库的状态是不是成功 stateu=1?
/*if(stateu=1){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.ROLLBACK_MESSAGE;
}*/
System.out.println("是否回查了checkLocalTransaction");
return LocalTransactionState.UNKNOW;
}
});
transactionMQProducer.start();
Message msg=new Message(RocketMQConfig.TOPIC_PAY,"transaction_tag","pay_1992","天青色3".getBytes());
//half 半消息
TransactionSendResult transactionSendResult = transactionMQProducer.sendMessageInTransaction(msg, 2);
System.out.println(transactionSendResult);
try {
TimeUnit.SECONDS.sleep(73);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("测试");
}
延时发送
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void delay()
throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message =
new Message(
rocketMQProperties.getSubscribe().get(0),
("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.getProducer().send(message);
}
}