整合RocketMQ

添加依赖

  • xml文件

    1
    2
    3
    4
    5
    6
    7
    <dependencies>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    </dependency>
    </dependencies>

rocketmq配置

  • 编写配置文件 config/rocketmq.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # 指定namesrv地址
    rocketmq.namesrvAddr=192.168.56.101:9876
    #生产者group名称
    rocketmq.producerGroupName=user_group
    #事务生产者group名称
    rocketmq.transactionProducerGroupName=order_transaction
    #消费者group名称
    rocketmq.consumerGroupName=user_consumer_group
    #生产者实例名称
    rocketmq.producerInstanceName=user_producer_instance
    #消费者实例名称
    rocketmq.consumerInstanceName=user_consumer_instance
    #事务生产者实例名称
    rocketmq.producerTranInstanceName=user_producer_transacition
    #一次最大消费多少数量消息
    rocketmq.consumerBatchMaxSize=1
    #广播消费
    rocketmq.consumerBroadcasting=false
    #消费的topic:tag
    rocketmq.subscribe[0]=topic_0
    #启动的时候是否消费历史记录
    rocketmq.enableHistoryConsumer=false
    #启动顺序消费
    rocketmq.enableOrderConsumer=false
  • yaml文件

    1
    2
    rocketmq:
    name-server: 192.168.56.101:9876
  • 读取配置文件到 JavaBean

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    package tk.fulsun.demo.config;

    import java.util.ArrayList;
    import java.util.List;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.stereotype.Component;

    /**
    * @author fulsun
    * @description: 读取配置文件信息
    * @date 6/2/2021 5:23 PM
    */
    @Component
    @Configuration
    @PropertySource("classpath:config/rocketmq.properties")
    @ConfigurationProperties(prefix = "rocketmq")
    public class RocketMQProperties {

    private String namesrvAddr;
    private String producerGroupName;
    private String transactionProducerGroupName;
    private String consumerGroupName;
    private String producerInstanceName;
    private String consumerInstanceName;
    private String producerTranInstanceName;
    private int consumerBatchMaxSize;
    private boolean consumerBroadcasting;
    private boolean enableHistoryConsumer;
    private boolean enableOrderConsumer;
    private List<String> subscribe = new ArrayList<String>();

    public String getNamesrvAddr() {
    return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
    this.namesrvAddr = namesrvAddr;
    }

    public String getProducerGroupName() {
    return producerGroupName;
    }

    public void setProducerGroupName(String producerGroupName) {
    this.producerGroupName = producerGroupName;
    }

    public String getTransactionProducerGroupName() {
    return transactionProducerGroupName;
    }

    public void setTransactionProducerGroupName(String transactionProducerGroupName) {
    this.transactionProducerGroupName = transactionProducerGroupName;
    }

    public String getConsumerGroupName() {
    return consumerGroupName;
    }

    public void setConsumerGroupName(String consumerGroupName) {
    this.consumerGroupName = consumerGroupName;
    }

    public String getProducerInstanceName() {
    return producerInstanceName;
    }

    public void setProducerInstanceName(String producerInstanceName) {
    this.producerInstanceName = producerInstanceName;
    }

    public String getConsumerInstanceName() {
    return consumerInstanceName;
    }

    public void setConsumerInstanceName(String consumerInstanceName) {
    this.consumerInstanceName = consumerInstanceName;
    }

    public String getProducerTranInstanceName() {
    return producerTranInstanceName;
    }

    public void setProducerTranInstanceName(String producerTranInstanceName) {
    this.producerTranInstanceName = producerTranInstanceName;
    }

    public int getConsumerBatchMaxSize() {
    return consumerBatchMaxSize;
    }

    public void setConsumerBatchMaxSize(int consumerBatchMaxSize) {
    this.consumerBatchMaxSize = consumerBatchMaxSize;
    }

    public boolean isConsumerBroadcasting() {
    return consumerBroadcasting;
    }

    public void setConsumerBroadcasting(boolean consumerBroadcasting) {
    this.consumerBroadcasting = consumerBroadcasting;
    }

    public boolean isEnableHistoryConsumer() {
    return enableHistoryConsumer;
    }

    public void setEnableHistoryConsumer(boolean enableHistoryConsumer) {
    this.enableHistoryConsumer = enableHistoryConsumer;
    }

    public boolean isEnableOrderConsumer() {
    return enableOrderConsumer;
    }

    public void setEnableOrderConsumer(boolean enableOrderConsumer) {
    this.enableOrderConsumer = enableOrderConsumer;
    }

    public List<String> getSubscribe() {
    return subscribe;
    }

    public void setSubscribe(List<String> subscribe) {
    this.subscribe = subscribe;
    }
    }

生产者服务

  • 注入生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package tk.fulsun.demo.manager;

    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import tk.fulsun.demo.config.RocketMQConfig;
    import tk.fulsun.demo.config.RocketMQProperties;

    /**
    * TODO
    * <pre>
    *
    * <pre>
    * @author fulsun
    * @date 2021/6/76:28
    */
    @Component
    public class Producer {

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private String producerGroup = "producer_group";

    private DefaultMQProducer producer;

    public Producer() {
    producer = new DefaultMQProducer(producerGroup);
    //指定nameserver地址,多个地址使用;分隔
    producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
    start();
    }

    public DefaultMQProducer getProducer() {
    return this.producer;
    }

    public void start() {
    try {
    this.producer.start();
    } catch (MQClientException e) {
    e.printStackTrace();
    }
    }

    public void shutdown() {
    this.producer.shutdown();
    }

    }

  • 测试类

    1
    2
    3
    4
    5
    6
    7
    8
    @Test
    public void sendMsg() throws Exception {
    DefaultMQProducer producer = this.producer.getProducer();
    byte[] body = "hello 1".getBytes(RemotingHelper.DEFAULT_CHARSET);
    Message message = new Message(rojectMQProperties.getSubscribe().get(0), "tag1", body);
    SendResult send = producer.send(message);
    System.out.println(send);
    }

消息消费者服务

  1. 同生产者的依赖和配置文件

  2. 消息监听者:注解方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "topic_0", consumerGroup = "user_consumer_group")
    public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
    log.info("Receive message:"+message);
    }
    }
  3. 代码方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    @Component
    public class Consumer {
    DefaultMQPushConsumer defaultMQPushConsumer;
    @Autowired private RocketMQProperties rocketMQProperties;

    // pull 拉取消息 push 推送消息 本质 拉取模式 好处 主动的拉取消息避免消费者的堆积
    @PostConstruct
    public void init() throws MQClientException {

    defaultMQPushConsumer = new DefaultMQPushConsumer(rocketMQProperties.getConsumerGroupName());
    // 订阅主题和 标签( * 代表所有标签)下信息
    defaultMQPushConsumer.subscribe(rocketMQProperties.getSubscribe().get(0), "*");
    // 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    defaultMQPushConsumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
    defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
    // 注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
    defaultMQPushConsumer.registerMessageListener(
    new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
    // msgs中只收集同一个topic,同一个tag,并且key相同的message
    // 会把不同的消息分别放置到不同的队列中
    List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    int size = msgs.size();
    System.out.println("打印的数据是多少" + size);
    for (MessageExt msg : msgs) {
    try {
    // String topic = msg.getTopic();
    // String tags = msg.getTags();
    // String keys = msg.getKeys();
    // String body = new String(msg.getBody(), "utf-8");
    // 再次修改数据库转态
    System.out.println(
    "consumeThread="
    + Thread.currentThread().getName()
    + "queueId="
    + msg.getQueueId()
    + ", content:"
    + new String(msg.getBody(), "utf-8"));
    } catch (Exception e) {
    e.printStackTrace();
    // ACK应答:再次下发消息 消息失败
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    }
    // ACK应答:消费成功 删除消息 commitlog日志 进度 删除
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    defaultMQPushConsumer.start();
    System.out.println("消费者启动了~~~");
    }
    }

发送普通消息

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.example.rocketmqdemo;

import com.example.rocketmqdemo.config.RocketMQConfig;
import com.example.rocketmqdemo.jms.Producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.bind.annotation.GetMapping;

@SpringBootTest
class RocketmqDemoApplicationTests {

@Autowired
private Producer producer;

@Test
public void test() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// String topic, 一级消息 用户类的消息 注册,登录,注销
// String tags, 二级分类 注销
// String keys, 唯一标签 手机号,身份证号
// byte[] body 内容
Message msg=new Message(RocketMQConfig.TOPIC,"zhuxiao","17600000000","我要注销手机号".getBytes());

SendResult send = producer.getProducer().send(msg);

System.out.println(send);

// [sendStatus=SEND_OK, 表示发送成功 其他状态都是失败
// msgId=0A08008A30F818B4AAC22798E44F0000, mq 为我们生成的唯一值,可能重复 查消息轨迹
// offsetMsgId=AF187DEB00002A9F00000000000007C7,
// messageQueue=MessageQueue
// topic=topic_zhoujielun, 主题
// brokerName=broker-a, broker名字
// queueId=3 , 第三个队列
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.example.rocketmqdemo.jms;


import com.example.rocketmqdemo.config.RocketMQConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class Consumer {

private String consumer="consumer_group_01";

private DefaultMQPushConsumer defaultMQPushConsumer;

Consumer1() throws MQClientException {
//
defaultMQPushConsumer=new DefaultMQPushConsumer(consumer);
//* 表示topic下队列所有的消息都监听
defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"*");
//消费位置,从队列最后消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//连接nameServer
defaultMQPushConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

try {
MessageExt messageExt = list.get(0);
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
String body = new String(messageExt.getBody(),"utf-8");
System.out.println("线程名"+Thread.currentThread().getName()+"消息主题:"+topic+"过滤标签:"+tags+"唯一值:"+keys+"内容:"+body);
//Ack应答
//ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 表示成功
//ConsumeConcurrentlyStatus.RECONSUME_LATER; 消息重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
System.out.println("处理失败了=----------");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
defaultMQPushConsumer1.start();
System.out.println("消费者启动启动了");
}

}

发送顺序消息

场景分析

  • 顺序消费是指消息的产生顺序和消费顺序相同

  • 有很多场景需要顺序消息,比如先买票再上车;京东买东西时,先下订单->付款->发货;等等

局部顺序消费

  • 那么在RocketMQ里局部顺序消息又是如何怎么实现的呢?

  • 要保证消息的顺序消费,有三个关键点

    1. 消息顺序发送
    2. 消息顺序存储
    3. 消息顺序消费
  • 发送同一个业务编号【一个订单对应多个运单】【发送到同一个队列】【队列只能被一个消费者消费】 效率就会变低

全局顺序消费

  • 所有发到mq的消息都被顺序消费,类似数据库中的binlog,需要严格保证全局操作的顺序性
  • 那么RocketMQ中如何做才能保证全局顺序消费呢?
    • 这就需要设置topic下读写队列数量为1, 性能差,线上不用

有序消息

  • 只用一个broker,或者将具有同一标识的信息发送到指定的broker

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    @Autowired private Producer producer;
    @Autowired private RocketMQProperties rocketMQProperties;

    @GetMapping("mq/order")
    public void orderMsg()
    throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    String[] tags = new String[] {"TagA", "TagC", "TagD"};
    // 订单列表
    List<OrderStep> orderList = buildOrders();
    Date date = new Date();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String dateStr = sdf.format(date);
    for (int i = 0; i < 10; i++) {
    // 加个时间前缀
    String body = dateStr + " Hello RocketMQ " + orderList.get(i);
    Message msg =
    new Message(
    rocketMQProperties.getSubscribe().get(0),
    tags[i % tags.length],
    "KEY" + i,
    body.getBytes());

    SendResult sendResult =
    producer
    .getProducer()
    .send(
    msg,
    new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    Long id = (Long) arg; // 根据订单id选择发送queue
    long index = id % mqs.size();
    return mqs.get((int) index);
    }
    },
    orderList.get(i).getOrderId()); // 订单id

    System.out.println(
    String.format(
    "SendResult status:%s, queueId:%d, body:%s",
    sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body));
    }
    }


    //顺序消费者
    Consumer() throws MQClientException {

    defaultMQPushConsumer= new DefaultMQPushConsumer(consumerGroup);
    defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"*");
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    defaultMQPushConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
    //决定了 List<MessageExt> list 数量,
    defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
    defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    int size = list.size();
    System.out.println("打印的数据是多少"+size);
    for (int i=0;i<list.size();i++){

    try {
    MessageExt messageExt = list.get(i);
    String topic = messageExt.getTopic();
    String tags = messageExt.getTags();
    String keys = messageExt.getKeys();
    TimeUnit.SECONDS.sleep(2);
    String body = new String(messageExt.getBody(),"utf-8");
    System.out.println(Thread.currentThread().getName()+""+"主题:"+topic+"二级标签:"+tags+"业务唯一:"+keys+"内部"+body);
    } catch (Exception e) {
    e.printStackTrace();
    //ACK应答:再次下发消息 消息失败
    return ConsumeOrderlyStatus.ROLLBACK;
    }
    }
    //ACK应答:消费成功 删除消息 commitlog日志 进度 删除
    return ConsumeOrderlyStatus.SUCCESS;
    }
    });
    defaultMQPushConsumer.start();
    System.out.println("消费者启动了~~~");
    }

Tag标签-过滤消息

  • 比如sql 过滤:工作做用处不多,面试也很少问

    1
    2
    3
    defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"*");通配符
    defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"jay"); 精确匹配
    defaultMQPushConsumer.subscribe(RocketMQConfig.TOPIC,"jay || ikun"); 多个匹配

批量消息投递

  • 批量发送消息可提高传递小消息的性能。同时也需要满足以下特征

  • 批量消息要求必要具有同一topic、相同消息配置

  • 不支持延时消息

  • 建议一个批量消息最好不要超过1MB大小

  • 这一批消息的总大小不应超过4MB。

场景

  1. 批量发送营销业务,
  2. 发送推送

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
a@Test
public void testBatch() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

//String topic, 逻辑分类 一级消息 用户,订单类,支付类
// String tags, 注册用户,注销用户,修改用户信息发送一个消息 二级分类
// String keys, 业务上的唯一值 幂等处理 手机号,身份证号
// byte[] body, 内部

List<Message> list=new ArrayList<>();
for (int i=0;i<20;i++){
Message message=new Message(RocketMQConfig.TOPIC,"jay",(i+"18月的肖邦"),(i+"鞋子特大号").getBytes());
list.add(message);
}
//DefaultMQProducer 同步返回
SendResult send = producer.getProducer().send(list);

// 发送消息SendResult
// [sendStatus=SEND_OK, 表示成功,其他的状态收拾失败
// msgId=C0A801680AA818B4AAC22B44F9EB0000, 唯一的id mq 内部生产 msgid可能重复 消息轨迹
// offsetMsgId=AF187DEB00002A9F0000000000000000,
// messageQueue=MessageQueue
// [topic=Topic_JAY, 主题 逻辑一集分类
// brokerName=broker-a, broke名字
// queueId=3], 发动到队列的位置 4 个
// queueOffset=0]
System.out.println("发送消息"+send);

}

发送事务消息

什么是事务消息

  • @Transactional注解控制本地事务
  • 通过消息来影响事务在服务端提交或是删除。全不成功,要不全部失败。保证消息发送之前本地事务执行成功
  • RocketMq 4.3版本中开源了事务消息
  • 最终一致性:临时中间状态 最终数据库修改为最终状态值 staus=支付中 status=支付成功
  • 事务消息:通过【控制事务的提交或者回滚】

能解决什么问题?

  • 分布式事务问题,分布式情况下最终最终一致性问题。
  • 非原子性操作

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.example.rocketmqdemo;

import com.example.rocketmqdemo.config.RocketMQConfig;
import com.example.rocketmqdemo.jms.Producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.HashMap;

import static com.example.rocketmqdemo.config.RocketMQConfig.TOPIC_PAY;

@SpringBootTest
class RocketmqDemoApplicationTests {

HashMap<String,String> hashMap=new HashMap<>();

@Autowired
private Producer producer;

@Test
public void test() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// String topic, 一级消息 用户类的消息 注册,登录,注销
// String tags, 二级分类 注销
// String keys, 唯一标签 手机号,身份证号
// byte[] body 内容
Message msg=new Message(RocketMQConfig.TOPIC,"zhuxiao33","17600000000","我要注销手机号".getBytes());

SendResult send = producer.getProducer().send(msg);

System.out.println(send);

// [sendStatus=SEND_OK, 表示发送成功 其他状态都是失败
// msgId=0A08008A30F818B4AAC22798E44F0000, mq 为我们生成的唯一值,可能重复 查消息轨迹
// offsetMsgId=AF187DEB00002A9F00000000000007C7,
// messageQueue=MessageQueue
// topic=topic_zhoujielun, 主题
// brokerName=broker-a, broker名字
// queueId=3 , 第三个队列
}

@Test
public void testTransaction() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

TransactionMQProducer transactionMQProducer=new TransactionMQProducer("pay_transaction_group");
transactionMQProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//这里面做实际业务:比如增删改差
try{
int i=(Integer)o;
byte[] body = message.getBody();
String s = new String(body, "utf-8");
String keys = message.getKeys();
System.out.println("执行业务代码了---->"+keys);
//broker就会让消费者消费消息
return LocalTransactionState.UNKNOW;
}catch (Exception e){
//broker 就会 删除消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}

}

/**
* 检查executeLocalTransaction执行的状态
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//去查数据库
String s = hashMap.get(messageExt.getKeys());
System.out.println("开始会查消息");
if(s!=null){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
transactionMQProducer.start();


Message msg=new Message(RocketMQConfig.TOPIC_PAY,"transaction_tag","pay_1992","天青色2".getBytes());
TransactionSendResult transactionSendResult = transactionMQProducer.sendMessageInTransaction(msg, 2);

System.out.println("发送事务消息的结果"+transactionSendResult);

}

}

事务消息原理

  • 两个核心概念:两阶段提交、事务状态定时回查

    RocetMQ事务消息设计图

  • COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息;

  • ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;

  • UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法

  • Half 半消息 broker 消费者看不到 topic 不是消费者订阅的。如果能收到就会给生产者发送一个 回复消息 half

  • 执行:executeLocalTransaction 执行本地方法 数据库的增删改 ;根据状态 返回值

    1
    2
    3
    4
    // COMMIT_MESSAGE 提交成功 消费者就能收到消息
    // LocalTransactionState.ROLLBACK_MESSAGE; 丢弃消息 告诉broker删除half消息 所有的消息都干掉
    // LocalTransactionState.UNKNOW; 中间状态 不丢消息,也不成功 会查
    // 网络中断收不到相应就会执行会查方法checkLocalTransaction
    1
    2
    executeLocalTransaction 执行本地方法,提交本地事务
    checkLocalTransaction broker调用会回查,返回值也决定了是否再次会查,如果返回commit也意味着本地事务执行成功 每隔一段时间会查,超过一定次数就认为失败
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
      @Test
    public void testTransaction() throws MQClientException {

    TransactionMQProducer transactionMQProducer=new TransactionMQProducer("pay_group");
    transactionMQProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
    transactionMQProducer.setTransactionListener(new TransactionListener() {
    //执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //业务代码的地方 [增删改查,状态 ,修改数据的状态 staus=1 支付中] 用户注册
    //insert
    System.out.println("执行本地方法");
    //返回值决定了消费者能不能收到消息
    // COMMIT_MESSAGE 提交成功 消费者就能收到消息
    // LocalTransactionState.ROLLBACK_MESSAGE; 丢弃消息
    // LocalTransactionState.UNKNOW; 中间状态 不丢消息,也不成功 会查
    //status=1 8点整
    return LocalTransactionState.UNKNOW;
    }
    //消息回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    /* String keys = messageExt.getKeys();
    int count= select.byKesy(keys);*/
    //8点5分 会查 查看数据库的状态是不是成功 stateu=1?
    /*if(stateu=1){
    return LocalTransactionState.COMMIT_MESSAGE;
    }else{
    return LocalTransactionState.ROLLBACK_MESSAGE;
    }*/
    System.out.println("是否回查了checkLocalTransaction");
    return LocalTransactionState.UNKNOW;
    }
    });
    transactionMQProducer.start();
    Message msg=new Message(RocketMQConfig.TOPIC_PAY,"transaction_tag","pay_1992","天青色3".getBytes());
    //half 半消息
    TransactionSendResult transactionSendResult = transactionMQProducer.sendMessageInTransaction(msg, 2);

    System.out.println(transactionSendResult);


    try {
    TimeUnit.SECONDS.sleep(73);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("测试");

    }

延时发送

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @GetMapping("mq/delay")
    public void delay()
    throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    int totalMessagesToSend = 10;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message message =
    new Message(
    rocketMQProperties.getSubscribe().get(0),
    ("Hello scheduled message " + i).getBytes());
    // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    message.setDelayTimeLevel(3);
    // 发送消息
    producer.getProducer().send(message);
    }
    }