先问一个问题:RocketMQ是怎么确保音讯与数据库业务的一致性?
第一时间或许会想到RocketMQ的业务音讯
咱们以日常开发中的案例来进行剖析:下单送积分。用户在下单后,订单体系保存订单数据,然后发送音讯到MQ,积分体系订阅这个音讯,然后给用户加积分。这就引出了一个问题,从生产者订单体系视点看,到底是先写库仍是先发音讯 呢?那咱们接下来就别离看下这两种情况。
1. 先写库后发音讯
咱们先经过一段伪代码来剖析下:
public void createOrder(final Order order) throws Exception {
//模仿spirng的tx模板
transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
//本地数据刺进
orderMapper.save(order);
orderDetailMapper.save(order.getOrderDetail());
//模仿 mq 发送音讯
SendResult send = producer.send(orderMessage);;
if (send.getSendStatus() == SendStatus.SEND_OK) {
status.setRollbackOnly();
}
return Boolean.TRUE;
}
});
}
咱们来剖析下它的进程:
首要,履行本地数据库业务,刺进数据,注意此刻还没有commit, 紧接着发送音讯到MQ, 这中心或许因为网络波动等原因,导致生产者迟迟没有收到broker的响应成果,比方5s内都没有回来SendResult给生产者,这也就意味着这5s内本地数据库业务是无法commit的,假如在高并发的场景下,数据库连接资源很快就会被耗尽,后续的请求则无法处理,最终体系将会崩溃。
既然咱们知道了先写库后发音讯有这样的问题,那么假如是先发音讯后写库呢?
2.先发音讯后写库
咱们仍是先看下代码:
public void createOrder(Order order) {
try {
//先发送音讯
SendResult send = producer.send(orderMessage);
if (send.getSendStatus() == SendStatus.SEND_OK) {
orderMapper.save(order);
orderDetailMapper.save(order.getOrderDetail());
//提交业务
connection.commit();
}
} catch (Exception e) {
//回滚
connection.rollback();
}
}
这样也是有问题的:
- 首要他也存在
先写库后发音讯
的问题,一旦MQ因为网络等原因长期没有回来SendResult给生产者,将会导致本地业务无法被提交或回滚,高并发下资源将会被快速耗尽。 - 其次,生产者将音讯发送出去并快速响应了,可是履行本地数据库业务时呈现了错误,比方上述代码中的
orderMapper.save(order)
履行出错了,这也就意味着音讯现已发送出去,顾客能够消费了,可是此刻本地业务失利了,为了弥补错误,此刻或许需求“回滚
”之前发送的音讯,可是此刻这条音讯或许现已被消费了,就算没有被消费,每次我都在发送音讯后判别是否呈现了反常,假如呈现了反常在发送条”回滚
“的音讯,这无疑是增加了开发的复杂度,也显得冗余。
那么有没有什么更好的办法,既能够不堵塞本地数据库业务,还能确保最终一致性呢?
这就是接下来咱们要说的RocketMQ的业务音讯,它能够确保本地业务与MQ音讯的最终一致性。
业务音讯咱们之前有剖析过它的源码和流程,这儿咱们简略看下
知道了业务音讯的大致流程后,接下来咱们仍是经过伪代码来看下它的实现进程。
- 发送业务音讯
发送的topic是 “tx_order_topic”,顾客订阅的也是这个,可是在发送到broker时,他会在内部将咱们的topic做一次修正,这样对顾客就不行见了。
@Slf4j
@Controller
public class OrderCreateController {
//rocketmq 发送音讯的模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
@ResponseBody
@GetMapping("/order/{buyer}")
public String createOrder(@PathVariable String buyer) {
//@Accessors(chain = true)
OrderDetail orderDetail = new OrderDetail();
orderDetail.setPhone("18883858508").setAddress("上海外滩xxxxx").setOrderDetailId(UUID.randomUUID().toString());
Order order = new Order();
order.setOrderId(UUID.randomUUID().toString()).setBuyer(buyer).setOrderDetail(orderDetail);
Message<Order> message = MessageBuilder.withPayload(order).build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_order_topic", message, null);
if (SendStatus.SEND_OK == result.getSendStatus()) {
log.info("发送音讯成功, result: {}", result);
}
//回查订单表
return "order create success";
}
}
rocketMQTemplate.sendMessageInTransaction(…)要等本地业务履行完毕,才会回来 TransactionSendResult
- 履行本地业务
@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderDetailMapper orderDetailMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("message: {}, args: {}", msg, arg);
String orderMsg = new String((byte[]) msg.getPayload());
final Order order = JSON.parseObject(orderMsg, Order.class);
log.info("order info : {}", order);
try {
//放到同一个本地业务中
this.transactionTemplate.executeWithoutResult(status -> {
this.orderMapper.saveOrder(order);
// int x = 1 / 0;
this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
});
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("保存订单失利", e);
//触发回查
return RocketMQLocalTransactionState.UNKNOWN;
//假如是ROLLBACK,则回滚音讯,rocketmq将抛弃这条音讯
}
}
//先忽略回查的逻辑
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {}
}
假如本地业务履行成功(订单正常入库),producer将给Broker发送一个COMMIT
的标识,此刻broker会将之前被替换了的topic给替换回去,这样顾客就能够消费了。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "qiuguan_test_consumer_group", topic = "tx_order_topic")
public class RewardsPoints implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
log.info("积分体系根据订单增加积分 : {}", message);
}
}
假如本地履行进程中发生了反常,比方网络抖动等,没有正常入库,此刻给Broker发送一个UNKNOW的标识,broker收到UNKNOW
标识后,默认按照每分钟一次的频率发起回查。
- 音讯回查
@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderDetailMapper orderDetailMapper;
@Autowired
private TransactionTemplate transactionTemplate;
//履行本地业务逻辑
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {}
//回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("履行本地业务回查:{}", LocalDateTime.now());
String orderMsg = new String((byte[]) msg.getPayload());
final Order order = JSON.parseObject(orderMsg, Order.class);
log.info("回查order: {}", order);
//回查次数
//int checkTimes = msg.getHeaders().get("TRANSACTION_CHECK_TIMES", Integer.class);
Order o = this.orderMapper.getOrder(order.getOrderId());
if (o == null) {
try {
this.transactionTemplate.executeWithoutResult(status -> {
this.orderMapper.saveOrder(order);
this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
});
} catch (Exception e) {
log.error("保存订单失利", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.COMMIT;
}
}
在回查的时分咱们能够检查数据库是否刺进了订单,假如没有,此刻咱们能够再次测验入库,假如入库成功,则响应给Broker一个COMMIT
标识,此刻该音讯就能够被顾客消费了,假如仍然入库失利,能够等候再次回查,或许回滚。假如是回滚,则Broker将丢掉该消费,顾客也将无法消费。
接下来咱们剖析下运用RocketMQ的业务音讯有哪些问题:
- 生产者发送业务音讯失利
这种情况就直接抛出反常即可,本地业务也不会履行,更不会存在数据不一致的问题。
- 生产者发送音讯成功,可是本地业务履行失利
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("message: {}, args: {}", msg, arg);
try {
this.transactionTemplate.executeWithoutResult(status -> {
this.orderMapper.saveOrder(order);
int x = 1 / 0;
this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
});
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("保存订单失利", e);
//回滚音讯
return RocketMQLocalTransactionState.ROLLBACK;
}
}
一旦本地业务履行失利,则数据库将会回滚,一起给broker发送ROLLBACK标识,broker收到该标识后,将抛弃掉这条音讯,顾客也无法消费这条音讯,这样也不会呈现数据不一致的问题。
-
生产者发送音讯成功,本地业务也履行成功,可是在生产者将
COMMIT
标识发送给broker时,发生了网络抖动,没有及时收到COMMIT
指令。
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("message: {}, args: {}", msg, arg);
try {
this.transactionTemplate.executeWithoutResult(status -> {
this.orderMapper.saveOrder(order);
this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
});
//网络抖动...
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("保存订单失利", e);
//回滚音讯
return RocketMQLocalTransactionState.ROLLBACK;
}
}
本地数据库业务履行成功,订单数据保存到表中,broker因为网络抖动没有及时收到
COMMIT
指令,此刻音讯仍是一条半业务音讯
,顾客仍是无法消费,这样本地业务与RocketMQ音讯的一致性就被破坏了。
RocketMQ为了处理这个问题,引入了音讯回查机制,关于半业务音讯,假如没有及时收到COMMIT/ROLLBACK
指令,它会测验主动与broker进行通信,调用监听器的 checkLocalTransaction(..)
办法再次承认之前的本地业务是否成功。
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("履行本地业务回查:{}", LocalDateTime.now());
final Order order = JSON.parseObject(new String((byte[]) msg.getPayload()), Order.class);
log.info("回查order: {}", order);
/**
* 因为之前本地业务现已履行成功,数据刺进了表中,只是在给broker发送COMMIT标识时发生了网络闪断
* 所以这儿回查的时分,是能够从数据库表中查询到订单数据的,此刻就能够给broker发送一个COMMIT标识
* 这样broker就会把这对顾客不行见的音讯修正为可见,此刻就能够消费了。
*/
Order o = this.orderMapper.getOrder(order.getOrderId());
/**
* 假如数据库中没有订单数据,说明之前的刺进就是失利的,此刻这儿测验再次刺进或许直接回滚就能够了
*/
return o == null ? RocketMQLocalTransactionState.ROLLBACK : RocketMQLocalTransactionState.COMMIT;
}
不难发现,运用RocketMQ的业务音讯具有以下好处:
将发送音讯和本地业务分脱离,假如发送音讯失利,则整个流程失利,不会堵塞本地业务,假如本地业务履行失利,则能够直接回滚或许回查,不会影响顾客。
好了,关于RocketMQ的业务音讯的实战就介绍到这儿,欢迎我们批评指正。