博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ROCKETMQ——事务消息发送、消费示例
阅读量:6120 次
发布时间:2019-06-21

本文共 5251 字,大约阅读时间需要 17 分钟。

hot3.png

摘要

  1. 事务消息是在发送端提供一种消息回查机制,如果消息发送状态是LocalTransactionState.COMMIT_MESSAGE,则表明本地消息发送成功,如果消息发送状态是 ROLLBACK_MESSAGE,则消息发送失败,消费端不会消费这条消息;如果消息发送状态是UNKNOW,则会一直调用 listener 里面的checkLocalTransaction回查本地状态。事务消息只对消息发送端提供一种回查机制,消息消费端没有什么特殊的。Push模式的消费端,提供至少消费一次的保证。
  2. 事务消息失败会回查,最多15次,时间间隔一分钟。见
org.apache.rocketmq.common.BrokerConfig.transactionCheckMax = 15;org.apache.rocketmq.common.BrokerConfig.transactionCheckInterval = 60 * 1000;

示例

Producer消息发送

public class TransactionListenerImpl implements TransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap
localTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); System.out.println("执行 executeLocalTransaction 方法"); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId());// if (null != status) {// switch (status) {// case 0:// return LocalTransactionState.COMMIT_MESSAGE;// case 1:// return LocalTransactionState.COMMIT_MESSAGE;// case 2:// return LocalTransactionState.ROLLBACK_MESSAGE;// default:// return LocalTransactionState.COMMIT_MESSAGE;// }// } System.out.println("执行 checkLocalTransaction 方法," + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(); return LocalTransactionState.COMMIT_MESSAGE; }}

Producer的listener

public class TransactionListenerImpl implements TransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap
localTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); System.out.println("执行 executeLocalTransaction 方法"); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId());// if (null != status) {// switch (status) {// case 0:// return LocalTransactionState.COMMIT_MESSAGE;// case 1:// return LocalTransactionState.COMMIT_MESSAGE;// case 2:// return LocalTransactionState.ROLLBACK_MESSAGE;// default:// return LocalTransactionState.COMMIT_MESSAGE;// }// } System.out.println("执行 checkLocalTransaction 方法," + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(); return LocalTransactionState.COMMIT_MESSAGE; }}

consumer消息消费

public class PushConsumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP");        consumer.setNamesrvAddr("localhost:9876");        consumer.subscribe("TOPIC_TEST", "*");// topic  tag        /**         * 全新的消费组 才适用这些策略         * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息		 * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍         * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前         *///        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//        consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); for(MessageExt msg:msgs){ System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody())); } /** * CONSUME_SUCCESS 消费成功 * RECONSUME_LATER 重试消费,重试次数可以设置,默认是16次 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}

源码阅读

参考文档

转载于:https://my.oschina.net/liangxiao/blog/3007198

你可能感兴趣的文章
充分利用HTML标签元素 – 简单的xtyle前端框架
查看>>
设计模式(十一):FACADE外观模式 -- 结构型模式
查看>>
iOS xcodebuile 自动编译打包ipa
查看>>
程序员眼中的 SQL Server-执行计划教会我如何创建索引?
查看>>
【BZOJ】1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路(floyd)
查看>>
cmake总结
查看>>
数据加密插件
查看>>
linux后台运行程序
查看>>
win7 vs2012/2013 编译boost 1.55
查看>>
IIS7如何显示详细错误信息
查看>>
ViewPager切换动画PageTransformer使用
查看>>
coco2d-x 基于视口的地图设计
查看>>
C++文件读写详解(ofstream,ifstream,fstream)
查看>>
Android打包常见错误之Export aborted because fatal lint errors were found
查看>>
Tar打包、压缩与解压缩到指定目录的方法
查看>>
新手如何学习 jQuery?
查看>>
配置spring上下文
查看>>
Python异步IO --- 轻松管理10k+并发连接
查看>>
mysql-python模块编译问题解决
查看>>
Oracle中drop user和drop user cascade的区别
查看>>