数据不会平白无故丢掉,也不会莫名其妙添加
一、概述
1、曾几何时,知了在一家小公司做项目的时分,都是一个服务打天下,所以涉及到数据共同性的问题,都是直接用本地业务处理。
2、跟着时刻的推移,用户量增大了,发现一个Java服务扛不住了,所以技术大佬决议关于体系进行晋级。依据体系的业务关于单体的一个服务进行拆分,然后关于开发人员也进行划分,一个开发人员只开发和维护一个或几个服务中的问题,我们各司其职,分工合作。
3、当然服务拆分不是一蹴即至的,这是一个耗时耗力的庞大工程,大多数体系都是进行多轮拆分,然后渐渐构成一个稳定的体系。恪守一个中心思想:先按总体业务进行一轮拆分,后面再依据拆分后的服务模块,进行一个细致的拆分。
4、跟着服务拆分之后,用户量是抗住了,可是发现数据都在不同的服务中存取,这就引出了一个新的问题:跨服务器,如何确保数据的共同性? 当然,跨服务的分布式体系中不仅仅这个问题,还有其他的一些列问题,如:服务可用性、服务容错性、服务间调用的网络问题等等,这儿只评论数据共同性问题。
5、提到数据共同性,大致分为三种:强共同性、弱共同性、终究共同性。
- 强共同性:数据一旦写入,在任一时刻都能读取到最新的值。
- 弱共同性:当写入一个数据的时分,其他地方去读这些数据,或许查到的数据不是最新的
- 终究共同性:它是弱共同性的一个变种,不追求体系恣意时刻数据要到达共同,可是在必守时刻后,数据终究要到达共同。
从这三种共同型的模型上来说,我们能够看到,弱共同性和终究共同性一般来说是异步冗余的,而强共同性是同步冗余的,异步处理带来了更好的功能,但也需求处理数据的补偿。同步意味着简单,但也必然会降低体系的功能。
二、理论
上述说的数据共同性问题,其实也便是在说分布式业务的问题,现在有一些解决计划,相信我们多多少少都看到过,这儿带我们回顾下。
2.1、二阶段提交
2PC是一种强共同性设计计划,经过引进一个业务和谐器来和谐各个本地业务(也称为业务参与者)的提交和回滚。 2PC首要分为2个阶段:
1、第一阶段:业务和谐器会向每个业务参与者建议一个敞开业务的指令,每个业务参与者履行准备操作,然后再向业务和谐器回复是否准备完结。 可是不会提交本地业务, 可是这个阶段资源是需求被锁住的。
2、第二阶段: 业务和谐器收到每个业务参与者的回复后,计算每个参与者的回复,假设每个参与者都回复“能够提交”,那么业务和谐器会发送提交指令,参与者正式提交本地业务,释放一切资源,结束大局业务。可是有一个参与者回复“回绝提交”,那么业务和谐器发送回滚指令,一切参与者都回滚本地业务,待全部回滚完结,释放资源,吊销大局业务。
业务提交流程
业务回滚流程
当然2PC存在的问题这儿也提一下,一个是同步堵塞,这个会耗费功能。另一个是和谐器毛病问题,一旦和谐器产生毛病,那么一切的参与者处理资源确定状况,那么一切参与者都会被堵塞。
2.2、三阶段提交
3PC首要是在2PC的基础上做了改善,首要为了解决2PC的堵塞问题。它首要是将2PC的第一阶段分为2个过程,先准备,再确定资源,并且引进了超时机制(这也意味着会造成数据不共同)。3PC的三个阶段包含:CanCommit
、PreCommit
和 DoCommit
详细细节就不打开赘述了,就一个中心观点:在CanCommit的时分并不确定资源,除非一切参与者都同意了,才开端锁资源。
2.3、TCC柔性业务
比较较前面的2PC和3PC,TCC和那哥俩的本质区别便是它是业务层面的分布式业务,而2PC和3PC是数据库层面的。TCC是三个单词的缩写:Try
、Confirm
、Cancel
,也分为这三个流程。
Try:测验,即测验预留资源,确定资源
Confirm:确认,即履行预留的资源,假设履行失利会重试
Cancel:吊销,吊销预留的资源,假设履行失利会重试
从上图可知,TCC关于业务的侵入是很大的,并且紧紧的耦合在一起。TCC比较较2PC和3PC,试用范围更广,可完成跨库,跨不同体系去完成分布式业务。缺点是要在业务代码中去开发很多的逻辑完成这三个过程,需求和代码耦合在一起,进步开发成本。
业务日志:在TCC形式中,业务建议者和业务参与者都会去记录业务日志(业务状况、信息等)。这个业务日志是整个分布式业务呈现意外情况(宕机、重启、网络中止等),完成提交和回滚的关键。
幂等性:在TCC第二阶段,confirm或者cancel的时分,这两个操作都需求确保幂等性。一旦由于网络等原因导致履行失利,就会建议不断重试。
防悬挂:由于网络的不可靠性,有异常情况的时分,try恳求或许比cancel恳求更晚到达。cancel或许会履行空回滚,可是try恳求被履行的时分也不会预留资源。
2.4、Seata
关于seata这儿就不多提了,用的最多的是AT形式,上回知了逐渐剖析过,装备完后只需求在业务建议的办法上添加@GlobalTransactional
注解就能够敞开大局业务,关于业务无侵入,低耦合。感兴趣的话请参阅之前评论Seata的内容。
三、应用场景
知了之前在一家公司遇到过这样的业务场景;用户经过页面投保,提交一笔订单过来,这个订单经过上游服务,处理保单相关的业务逻辑,最终流入下流服务,处理成绩、人员提升、分润处理等等业务。关于这个场景,两头处理的业务逻辑不在同一个服务中,接入的是不同的数据库。涉及到数据共同性问题,需求用到分布式业务。
关于上面介绍的几种计划,只是评论了理论和思路,下面我来总结下这个业务场景中运用的一种完成计划。采用了本地音讯表+MQ异步音讯的计划完成了业务终究共同性,也符合其时的业务场景,相对强共同性,完成的功能较高。下面是该计划的思路图
- 实在业务处理的状况或许会有多种,因而需求明确哪种状况需求守时使命补偿
- 假设某条单据一直无法处理结束,守时使命也不能无限制下发,所以本地音讯表需求添加次序的概念,重试多少次后告警,人工介入处理
- 由于MQ和守时使命的存在,难免会呈现重复恳求,因而下流要做好幂等防重,否则会呈现重复数据,导致数据不共同
关于落地完成,话不多说,直接上代码。先定义两张表tb_order和tb_notice_message,分别存订单信息和本地业务信息
CREATE TABLE `tb_order` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`user_id` int(11) NOT NULL COMMENT '下单人id',
`order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '订单编号',
`insurance_amount` decimal(16,2) NOT NULL COMMENT '保额',
`order_amount` decimal(16,2) DEFAULT NULL COMMENT '保费',
`create_time` datetime DEFAULT NULL COMMENT '创建时刻',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时刻',
`is_delete` tinyint(4) DEFAULT '0' COMMENT '删去标识:0-不删去;1-删去',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`type` tinyint(4) NOT NULL COMMENT '业务类型:1-下单',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状况:1-待处理,2-已处理,3-预警',
`data` varchar(255) NOT NULL COMMENT '信息',
`retry_count` tinyint(4) DEFAULT '0' COMMENT '重试次数',
`create_time` datetime NOT NULL COMMENT '创建时刻',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时刻',
`is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删去标识:0-不删去;1-删去',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
处理订单service,这儿能够用到我们之前说过的装修器形式,去装修这个service。把保存本地业务,发送mq音讯,交给装修器类去做,而service只需求关怀业务逻辑即可,也符合开闭原则。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 10:58
* @description
*/
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {
private final OrderMapper orderMapper;
/**
* 订单处理办法:只处理订单相关逻辑
* @param o
* @return
*/
@Override
public Order handle(Object o) {
// 订单信息
Order order = Order.builder()
.orderNo("2345678")
.createTime(LocalDateTime.now())
.userId(1)
.insuranceAmount(new BigDecimal(2000000))
.orderAmount(new BigDecimal(5000))
.build();
orderMapper.insert(order);
return order;
}
}
新增OrderService
的装修类OrderServiceDecorate
,负责对订单逻辑的扩展,这儿是添加本地业务音讯,以及发送MQ信息,扩展办法添加了Transactional
注解,确保订单逻辑和本地业务音讯的数据在同一个业务中进行,确保原子性。其间业务音讯符号处理中,待下流服务处理完业务逻辑,再更新处理完结。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/14 18:48
* @description
*/
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {
private final NoticeMessageMapper noticeMessageMapper;
private final RabbitTemplate rabbitTemplate;
/**
* 装修办法:对订单处理逻辑进行扩展
* @param o
* @return
*/
@Override
@Transactional
public Object handle(Object o) {
// 调用service办法,完成保单逻辑
Order order = (Order) service.handle(o);
// 扩展:1、保存业务音讯,2、发送MQ音讯
// 本地业务音讯
String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
NoticeMessage noticeMessage = NoticeMessage.builder()
.retryCount(0)
.data(data)
.status(1)
.type(1)
.createTime(LocalDateTime.now())
.build();
noticeMessageMapper.insert(noticeMessage);
// 发送mq音讯
log.info("发送mq音讯....");
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
return null;
}
}
关于这个装修者形式,之前有讲到过,能够看下之前发布的内容。
下流服务监听音讯,处理完自己的业务逻辑后(如:成绩、分润、提升等),需求发送MQ,上游服务监听音讯,更新本地业务状况为已处理。这需求留意的是下流服务需求做幂等处理,避免异常情况下,上游服务数据的重试。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 18:07
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
public void orderHandler(String msg) {
log.info("监听到订单音讯:{}", msg);
// 需求留意幂等,幂等逻辑
log.info("下流服务业务逻辑。。。。。");
JSONObject json = JSONUtil.parseObj(msg);
rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
}
}
这儿插个题外话,关于幂等的处理,我这儿大致有两种思路 1、比如依据订单号查一下记录是否存在,存在就直接返回成功。 2、redis存一个仅有的恳求号,处理完再删去,不存在恳求号的直接返回成功,能够写个AOP去处理,与业务阻隔。 言归正传,上游服务音讯监听,下流发送MQ音讯,更新本地业务音讯为已处理,分布式业务流程结束。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 18:29
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {
@Autowired
private NoticeMessageMapper noticeMessageMapper;
@RabbitHandler
public void updateOrder(Integer msgId) {
log.info("监听音讯,更新本地业务音讯,音讯id:{}", msgId);
NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
noticeMessageMapper.updateById(msg);
}
}
存在异常情况时,会经过守时使命,轮询的往MQ中发送音讯,尽最大努力去让下流服务到达数据共同,当然重试也要设置上限;若到达上限今后还一直是失利,那不得不考虑是下流服务本身存在问题了(有或许便是代码逻辑存在问题)。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/14 10:25
* @description
*/
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {
private final RabbitTemplate rabbitTemplate;
private final NoticeMessageMapper noticeMessageMapper;
/**
* 最大自动重试次数
*/
private final Integer MAX_RETRY_COUNT = 5;
@Scheduled(cron = "0/20 * * * * ? ")
public void retry() {
log.info("守时使命,重试异常订单");
LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
wrapper.eq(NoticeMessage::getStatus, 1);
List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
for (NoticeMessage noticeMessage : noticeMessages) {
// 从头发送mq音讯
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
// 重试次数+1
noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
noticeMessageMapper.updateById(noticeMessage);
// 判别重试次数,等于最长限制次数,直接更新为报警状况
if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
noticeMessage.setStatus(3);
noticeMessageMapper.updateById(noticeMessage);
// 发送告警,通知对应人员
// 告警逻辑(短信、邮件、企微群,等等)....
}
}
}
}
其实这儿有个问题,一个上游服务对应多个下流服务的时分。这个时分往往不能存一条本地音讯记录。
- 这儿能够在音讯表多加个字段next_server_count,表明一个订单建议方,需求调用的下流服务数量。上游服务监听的时分,每次会与下流的回调都减去1,直到数值是0的时分,再更新状况是已处理。可是要操控并发,这个字段是被多个下流服务同享的。
- 还有一种处理计划是为每个下流服务,都记录一条业务音讯,用type字段去区分,符号类型。完成上游和下流关于业务音讯的一对一关系。
- 最终,到达最大重试次数今后,能够将音讯加入到一个告警列表,这个告警列表能够展现在办理后台或其他监控体系中,展现一些必要的信息,去供公司内部人员去人工介入,处理这种异常的数据,使得数据到达终究共同性。
四、总结
其实分布式业务没有一个完美的处理计划,只能说是尽量去满意业务需求,满意数据共同。假设程序不能处理了,最终由人工去兜底,做数据的补偿计划。
五、参阅源码
编程文档:
https://gitee.com/cicadasmile/butte-java-note
应用库房:
https://gitee.com/cicadasmile/butte-flyer-parent