摘要
- 事务消息是在发送端提供一种消息回查机制,如果消息发送状态是LocalTransactionState.COMMIT_MESSAGE,则表明本地消息发送成功,如果消息发送状态是 ROLLBACK_MESSAGE,则消息发送失败,消费端不会消费这条消息;如果消息发送状态是UNKNOW,则会一直调用 listener 里面的checkLocalTransaction回查本地状态。事务消息只对消息发送端提供一种回查机制,消息消费端没有什么特殊的。Push模式的消费端,提供至少消费一次的保证。
- 事务消息失败会回查,最多15次,时间间隔一分钟。见
org.apache.rocketmq.common.BrokerConfig.transactionCheckMax = 15;org.apache.rocketmq.common.BrokerConfig.transactionCheckInterval = 60 * 1000;
示例
Producer消息发送
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMaplocalTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); System.out.println("执行 executeLocalTransaction 方法"); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId());// if (null != status) {// switch (status) {// case 0:// return LocalTransactionState.COMMIT_MESSAGE;// case 1:// return LocalTransactionState.COMMIT_MESSAGE;// case 2:// return LocalTransactionState.ROLLBACK_MESSAGE;// default:// return LocalTransactionState.COMMIT_MESSAGE;// }// } System.out.println("执行 checkLocalTransaction 方法," + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(); return LocalTransactionState.COMMIT_MESSAGE; }}
Producer的listener
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMaplocalTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); System.out.println("执行 executeLocalTransaction 方法"); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId());// if (null != status) {// switch (status) {// case 0:// return LocalTransactionState.COMMIT_MESSAGE;// case 1:// return LocalTransactionState.COMMIT_MESSAGE;// case 2:// return LocalTransactionState.ROLLBACK_MESSAGE;// default:// return LocalTransactionState.COMMIT_MESSAGE;// }// } System.out.println("执行 checkLocalTransaction 方法," + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.out.println(); return LocalTransactionState.COMMIT_MESSAGE; }}
consumer消息消费
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TOPIC_TEST", "*");// topic tag /** * 全新的消费组 才适用这些策略 * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息 * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 */// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); for(MessageExt msg:msgs){ System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody())); } /** * CONSUME_SUCCESS 消费成功 * RECONSUME_LATER 重试消费,重试次数可以设置,默认是16次 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}