无论是在开发过程中仍是在预备跑路的面试过程中,有关redis相关的,难免会涉及到四个特别场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。假如在开发中不留意这些场景的话,在高并发场景下有或许会导致体系崩溃,数据错乱等状况。现在,结合实践的事务场景来复现并解决这些问题。
相关技能:springboot2.2.2+mybatisplus3.1+redis5.0+hutool5.8
缓存穿透
缓存穿透是指查询缓存和数据库中都不存在的数据,导致一切的查询压力悉数给到了数据库。
比方查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,假如存在则直接回来,不然再查询数据库并将查询成果进行缓存。
@Slf4j
@Service
public class DocumentInfoServiceImpl extends ServiceImpl<DocumentInfoMapper, DocumentInfo> implements DocumentInfoService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) { //缓存命中
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if (ObjectUtil.isNotNull(documentInfo)) { // 缓存成果
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
}
@GetMapping("/doc/queryById")
public Result<DocumentInfo> queryById(@RequestParam(name = "docId") Integer docId) {
return Result.success(documentInfoService.getDocumentDetail(docId));
}
假如项意图并发量不大,这样写的话简直没啥问题。假如项意图并发量很大,那么这就存在一个隐藏问题,假如在拜访了一个不存在的文章(这个文章现已被分享出去,可是在后台或许是被删去或许下线状态),那么就会导致一切的恳求悉数需求到数据库中进行查询,然后给数据库形成压力,乃至形成宕机。 http://127.0.0.1:8081/doc/queryById?docId=不存在的id
2023-01-05 10:18:57.954 INFO 19692 --- [nio-8081-exec-8] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.121 INFO 19692 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.350 INFO 19692 --- [io-8081-exec-10] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.519 INFO 19692 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.661 INFO 19692 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.859 INFO 19692 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.012 INFO 19692 --- [nio-8081-exec-9] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.154 INFO 19692 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
解决计划一:缓存空方针
针对缓存穿透问题缓存空方针能够有用防止所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空方针并设置一个过期时刻(设置过期时刻是为了防止呈现数据库中存在了数据可是缓存中仍然是空数据现象),这样能够防止一切恳求悉数查询数据库的状况。
// 查询方针不存在
if(StrUtil.equals(obj,"")){
log.info("==== select from cache , data not available ====");
return null;
}
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
//假如数据不存在,则缓存一个空方针并设置过期时刻
stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS);
// if (ObjectUtil.isNotNull(documentInfo)) {
// stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
// }
}
2023-01-05 13:15:01.057 INFO 16600 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 13:15:01.214 INFO 16600 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.384 INFO 16600 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.540 INFO 16600 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.720 INFO 16600 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
解决计划二:布隆过滤器
缓存空方针的缺陷在于无论数据存不存在都需求查询一次数据库,并且redis中存储了许多的空数据,这个时分能够选用布隆过滤器来解决。布隆过滤器能够简略的了解为由一个很长的二进制数组结合n个hash算法核算出n个数组下标,将这些数据下标置为1。在查找数据时,再次经过n个hash算法核算出数组下标,假如这些下标的值为1,表示该值或许存在(存在hash抵触的原因),假如为0,则表示该值必定不存在。
/**
* 布隆过滤器增加元素伪代码
*/
BitArr[] bit = new BitArr[10000]; // 新建一个二进制数组
List<String> insertData = Arrays.asList("A", "B", "C"); // 待增加元素
for (String insertDatum : insertData) {
for (int i=1;i<=3;i++){ // 运用3中hash算法核算出3个数组下标
int bitIdx = hash_i(insertDatum); //hash1(insertDatum),hash2(insertDatum),hash3(insertDatum)
bit[bitIdx]=1; // 将下标元素置为1
}
}
/**
* 布隆过滤器查找元素伪代码
*/
BitArr[] bit = new BitArr[10000];
for (int i=1;i<=3;i++){
int bitIdx = hash_i("E"); //核算E的数组下标
if(bit[bitIdx]==0){ //假如对应的元素为0,则必定不存在
return false;
}
}
return true;
- 布隆过滤器的完成
在运用布隆过滤器时有两个中心参数,分别是预估的数据量size以及期望的误判率fpp,这两个参数咱们能够依据自己的事务场景和数据量进行自主设置。在完成布隆过滤器时,有两个中心问题,分别是hash函数的选取个数n以及确认bit数组的巨细len。
- 依据预估数据量size和误判率fpp,能够核算出bit数组的巨细len。
- 依据预估数据量size和bit数组的长度巨细len,能够核算出所需求的hash函数个数n。
单机版布隆过滤器
目前单机版的布隆过滤器完成方法有许多,比方Guava供给的BloomFilter,Hutool工具包中供给的BitMapBloomFilter等。以Guava为例,需求引入对应的依靠包,在BloomFilter类中供给了create办法来进行布隆过滤器的创建。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
public static BloomFilter<Integer> localBloomFilter = BloomFilter.create(Funnels.integerFunnel(),10000L,0.01);
创建完成后,将需求筛选的数据同步到过滤器中。
/**
* 单机版布隆过滤器数据初始化
*/
@PostConstruct
public void initDocumentDataLocal(){
List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
if(CollUtil.isNotEmpty(documentInfos)){
documentInfos.stream().map(DocumentInfo::getId).forEach(e->{
BloomFilterUtil.localBloomFilter.put(e);
});
}
}
在事务代码中,能够直接调用BloomFilter供给的mightContain办法,判别方针docId是否或许存在于过滤器中,假如或许存在,那么持续向下履行事务逻辑,不然直接中断履行。
@Override
public DocumentInfo getDocumentDetail(int docId) {
//布隆过滤器阻拦
boolean mightContain = BloomFilterUtil.localBloomFilter.mightContain(docId);
if(!mightContain){ //是否有或许存在于布隆过滤器中
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if(ObjectUtil.isNotNull(documentInfo)){
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
自界说分布式版布隆过滤器
自界说分布式布隆过滤器的存储依靠于redis的bitmap数据结构来完成,别的还需求界说四个参数,分别为预估数据量size,误判率fpp,数组巨细bitNum以及hash函数个数hashNum其间预估数据量和误判率需求配置在yml文件中。
@Resource
private StringRedisTemplate stringRedisTemplate;
@Value("${bloom.filter.size}")
private long size; // 预估数据量
@Value("${bloom.filter.fpp}")
private double fpp; // 误判率
private long bitNum; //数组巨细len
private int hashNum; // hash函数个数size
依据上面的两个公式,核算出相应的数组长度以及所需的hash函数个数,并在redis设置一个布隆过滤器的key。
/*
* 核算出数组长度,hash函数个数并初始化数组
*/
@PostConstruct
private void initBloom() {
this.bitNum = getNumOfBits(size, fpp);
this.hashNum = getNumOfHashFun(size, bitNum);
//借助redis的bitmap来完成二进制数组
stringRedisTemplate.opsForValue().setBit("bloom::filter", bitNum, false);
}
/**
* 核算bit数组巨细
*
* @param size
* @param fpp
* @return
*/
private long getNumOfBits(long size, double fpp) {
return (long) (-size * Math.log(fpp) / (Math.log(2) * Math.log(2)));
}
/**
* 核算所需的hash个数
*
* @param size
* @param numOfBits
* @return
*/
private int getNumOfHashFun(long size, long numOfBits) {
return Math.max(1, (int) Math.round((double) numOfBits / size * Math.log(2)));
}
别的,需求供给两个办法,分别为增加元素的putBloomFilterRedis办法和判别元素是否有或许存在的办法existBloomFilterRedis,其间的完成方法参阅了guava。
/**
* 像自界说布隆过滤器中增加元素
* @param key
*/
public void putBloomFilterRedis(String key) {
long hash64 = HashUtil.metroHash64(key.getBytes());
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= hashNum; i++) {
/**
* 上面不是说,要运用n个hash函数吗??为啥这儿直接用一个动态变量取乘积了呢???
* 不必担心,请看《Less Hashing, Same Performance: Building a Better Bloom Filter》,
* 里面论述了这种操作不会影响布隆过滤器的功能,毕竟hash的价值仍是很大的,这算是个有用的优化手段吧:
* A standard technique from the hashing literature is to use two hash
* functions h(x) and h(x) to simulate additional hash functions of the formg(x) = h(x) + ih(x) .
*/
int combinedHash = hash1 + i * hash2;
if (combinedHash < 0) {
//假如为负数,则取反(保证成果为正数)
combinedHash = ~combinedHash;
}
// 核算出数组下标,并将下标值置为1
int bitIdx = (int) (combinedHash % bitNum);
stringRedisTemplate.opsForValue().setBit("bloom::filter", bitIdx, true);
}
}
/**
* 判别自界说布隆过滤器中元素是否有或许存在
* @param key
* @return
*/
public boolean existBloomFilterRedis(String key) {
long hash64 = HashUtil.metroHash64(key.getBytes());
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= hashNum; i++) {
int combinedHash = hash1 + i * hash2;
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
int bitIdx = (int) (combinedHash % bitNum);
//判别下标值是否为1,假如不为1直接回来false
Boolean bit = stringRedisTemplate.opsForValue().getBit("bloom::filter", bitIdx);
if (!bit) {
return false;
}
}
return true;
}
办法完成后,将一切的key值数据同步到redis中。
@Component
public class BloomFilterInitData {
@Resource
private BloomFilterUtil bloomFilterUtil;
@Resource
private DocumentInfoService documentInfoService;
@PostConstruct
public void initDocumentData(){
List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
if(CollUtil.isNotEmpty(documentInfos)){
documentInfos.stream().map(m -> {
return "doc::info::" + m.getId().intValue();
}).forEach(e->{
bloomFilterUtil.putBloomFilterRedis(e);
});
}
}
}
上面悉数搞定后,启动项目并测验成果是否有用,在启动前先在数据表中搞几条测验数据。
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
// 布隆过滤器中是否有或许存在这个key
boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
if(!b){
// 假如不存在,直接回来空
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if(ObjectUtil.isNotNull(documentInfo)){
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
查询存在的数据。
查询不存在的数据。
检查两次恳求打印的log,不存在的数据成功被阻拦掉了,防止再去查询数据库,即便存在必定的误判率,也简直不会有啥影响,最多便是查询一次数据库。
尽管布隆过滤器能够有用的解决缓存穿透问题,并且完成的算法查找功率也很快。可是,也存在必定的缺陷,因为存在hash抵触的原因,一方面存在必定的误判率(某个在过滤器中并不存在的key,可是经过hash核算出来的下标值都为1)。另一方面,删去比较困难(假如将一个数组位置为0,那么这个位置有或许也代表其他key的值,会影响到其他的key)。
缓存击穿
缓存击穿是指拜访某个热门数据时,缓存中并不存在该数据或许缓存过期了,这个时分悉数的恳求压力给到了数据库。
基于上面的代码,来模仿短时刻内进行并发恳求,看看会不会将恳求悉数打到数据库。
public static void main(String[] args) throws InterruptedException {
/**
* 短时刻内并发恳求接口,并拜访同一个数据
*/
ExecutorService executorService = Executors.newFixedThreadPool(1000);
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executorService.execute(() -> {
HttpResponse response = HttpUtil.createGet("http://127.0.0.1:8081/doc/queryById?docId=1").execute();
System.out.println(response.body());
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
依据日志输出成果显示,恳求确实给到了数据库。 针对缓存击穿问题,有两种解决计划,一种是对热门数据不设置过期时刻,另一种是选用互斥锁的方法。
解决计划一:热门数据不设置过期时刻
热门数据不设置过期时刻,当后台更新热门数据数需求同步更新缓存中的数据,这种解决方法适用于不严格要求缓存一致性的场景。
解决计划二:运用互斥锁
假如是单机部署的环境下能够运用synchronized或lock来处理,保证一起只能有一个线程来查询数据库,其他线程能够等候数据缓存成功后在被唤醒,然后直接查询缓存即可。假如是分布式部署,能够选用分布式锁来完成互斥。
@Component
public class RedisLockUtil {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 模仿互斥锁
* @param key
* @param value
* @param exp
* @return
*/
public boolean tryLock(String key, String value, long exp) {
Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, value, exp, TimeUnit.SECONDS);
if (absent) {
return true;
}
return tryLock(key, value, exp); //假如线程没有获取锁,则在此处循环获取
}
/**
* 开释锁
* @param key
* @param value
*/
public void unLock(String key, String value) {
String s = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.equals(s, value)) { //防止锁被其他线程误删
stringRedisTemplate.delete(key);
}
}
}
有了上面的两个办法,能够对事务代码进行改造,在查询数据库前进行加锁,读取完成后在开释锁。
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
if (!b) {
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
String s = UUID.randomUUID().toString(); //给锁加个标识,防止误删
String lockKey = redisKey+"::lock";
boolean lock = redisLockUtil.tryLock(lockKey, s, 60); //尝试加锁
if (lock) {
try {
//假如加锁成功,先再次查询缓存,有或许上一个线程查询并增加到缓存了
obj = stringRedisTemplate.opsForValue().get(redisKey);
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if (ObjectUtil.isNotNull(documentInfo)) {
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
} finally {
redisLockUtil.unLock(lockKey, s); //开释锁
}
}
}
return documentInfo;
}
一顿梭哈后,再次模仿并发查询,看看最终作用,理想的成果状态应该是查询一次数据库,后边的查询直接经过缓存获取。经过日志输出能够看出来,击穿问题被有用解决啦。
缓存雪崩
缓存雪崩是指对热门数据设置了相同的过期时刻,在同一时刻这些热门数据key大批量产生过期,恳求悉数转发到数据库,然后导致数据库压力骤增,乃至宕机。与缓存击穿不同的是,缓存击穿是单个热门数据过期,而缓存雪崩是大批量热门数据过期。
针对缓存雪崩问题,常见的解决计划有多种,比方设置随机的过期时刻或许不设置过期时刻,建立高可用的缓存架构防止redis服务宕机,服务降级等。
解决计划一:设置随机的过期时刻
将key的过期时刻后边加上一个随机数,这个随机数值的规模能够依据自己的事务状况自行设定,这样能够让key均匀的失效,防止大批量的一起失效。
if (ObjectUtil.isNotNull(documentInfo)) {
//生成一个随机数
int randomInt = RandomUtil.randomInt(2, 10);
//过期时刻+随机数
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L+randomInt, TimeUnit.SECONDS);
}
解决计划二:不设置过期时刻
不设置过期时刻时,需求留意的是,在更新数据库数据时,一起也需求更新缓存数据,不然数据会呈现不一致的状况。这种方法比较适用于不严格要求缓存一致性的场景。
if (ObjectUtil.isNotNull(documentInfo)) {
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
}
解决计划三:建立高可用集群
缓存服务毛病时,也会触发缓存雪崩,为了防止因服务毛病而产生的雪崩,引荐运用高可用的服务集群,这样即便产生毛病,也能够进行毛病转移。集群相关的在之前也有介绍过Redis高可用架构建立到原理分析
数据一致性
通常状况下,运用缓存的直接意图是为了进步体系的查询功率,减轻数据库的压力。一般状况下运用缓存是下面这几步骤:
- 查询缓存,数据是否存在
- 假如数据存在,直接回来
- 假如数据不存在,再查询数据库
- 假如数据库中数据存在,那么将该数据存入缓存并回来。假如不存在,回来空。
这么搞如同看上去并没有啥问题,那么会有一个细节问题:当一条数据存入缓存后,马上又被修改了,那么这个时分缓存该怎么更新呢。不更新必定不行,这样导致了缓存中的数据与数据库中的数据不一致。一般状况下对于缓存更新有下面这几种状况:
- 先更新缓存,再更新数据库
- 先更新数据库,再更新缓存
- 先删去缓存,再更新数据库
- 先更新数据库,再删去缓存
先更新缓存,再更新数据库
先更新缓存,再更新数据库这种状况下,假如事务履行正常,不呈现网络等问题,这么操作不会有啥问题,两边都能够更新成功。可是,假如缓存更新成功了,可是当更新数据库时或许在更新数据库之前呈现了反常,导致数据库无法更新。这种状况下,缓存中的数据变成了一条实践不存在的假数据。 比方,在更新文章详情时,先修改了redis中的数据,在更新数据库前抛出一个反常来模仿数据库更新失利的场景。
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 先更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 模仿更新数据库前呈现反常
int i = 1 / 0;
// 在更新数据库
boolean b = this.updateById(documentInfo);
return b;
}
@PostMapping("/doc/updateDocument")
public Result<Boolean> updateDocument(@RequestBody DocumentInfo documentInfo) {
return Result.success(documentInfoService.updateDocument(documentInfo));
}
先调用更新的接口,在调用查询的接口,成果发现查询的接口回来的并不是数据库中的实践数据,这个时分就形成了缓存与数据库数据不一致的状况。
先更新数据库,再更新缓存
先更新数据库,再更新缓存和先更新缓存,再更新数据库的状况根本一致,假如失利,会导致数据库中是最新的数据,缓存中是旧数据。还有一种极点状况,在高并发状况下简单呈现数据掩盖的现象:A线程更新完数据库后,在要履行更新缓存的操作时,线程被堵塞了,这个时分线程B更新了数据库并成功更新了缓存,当B履行完成后线程A持续向下履行,那么最终线程B的数据会被掩盖。
@Override
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 更新数据库
boolean b = this.updateById(documentInfo);
//以标题为标识,模仿线程堵塞。当一个恳求的标题为‘模仿数据掩盖’时,线程停4秒
if(StrUtil.equals(documentInfo.getTitle(),"模仿数据掩盖")){
try {
Thread.sleep(4000);
}catch (Exception e){
}
}
// 更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 模仿更新数据库前呈现反常
return b;
}
先删去缓存,再更新数据库
先删去缓存,再更新数据库这种状况,假如并发量不大用起来不会有啥问题。可是在并发场景下会有这样的问题:线程A在删去缓存后,在写入数据库前产生了堵塞。这时线程B查询了这条数据,发现缓存中不存在,继而向数据库发起查询恳求,并将查询成果缓存到了redis。当线程B履行完成后,线程A持续向下履行更新了数据库,那么这时缓存中的数据为旧数据,与数据库中的值不一致。
先更新数据库,再删去缓存
先更新数据库,再删去缓存也并不是绝对安全的,在高并发场景下,假如线程A查询一条在缓存中不存在的数据(这条数据有或许过期被删去了),查询数据库后在要将查询成果缓存到redis时产生了堵塞。这个时分线程B发起了更新恳求,先更新了数据库,再次删去了缓存。当线程B履行成功后,线程A持续向下履行,将查询成果缓存到了redis中,那么此刻缓存中的数据与数据库中的数据产生了不一致。
解决数据不一致计划
延时双删
延时双删,即在写数据库之前删去一次,写完数据库后,再删去一次,在第2次删去时,并不是立即删去,而是等候必定时刻在做删去。
这个延时的功用能够运用mq来完成,这儿为了省事,偷个懒,本地测验运用的延时行列来模仿mq到达延时作用。首先需求界说一个行列元素方针DoubleDeleteTask。
@Data
public class DoubleDeleteTask implements Delayed {
private String key; // 需求删去的key
private long time; //需求推迟的时刻
public DoubleDeleteTask(String key, long time) {
this.key = key;
this.time = time + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(time, ((DoubleDeleteTask) o).time);
}
}
然后界说一个行列并交给spring管理。
@Configuration
public class DoubleDeleteQueueConfig {
@Bean(name = "doubleDeleteQueue")
public DelayQueue<DoubleDeleteTask> doubleDeleteQueue() {
return new DelayQueue<DoubleDeleteTask>();
}
}
设置一个独立线程,特意用来处理延时的使命。假如数据删去失利,能够自界说重试次数以保证数据的一致性,可是也会带来必定的功能影响,假如在实践项目中,建议仍是以异步的方法来完成重试。
@Slf4j
@Component
public class DoubleDeleteTaskRunner implements CommandLineRunner {
@Resource
private DelayQueue<DoubleDeleteTask> doubleDeleteQueue;
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final int retryCount = 3; //失利重试次数
@Override
public void run(String... args) throws Exception {
new Thread(() -> {
try {
while (true) {
DoubleDeleteTask take = doubleDeleteQueue.take(); //取出行列元素
String key = take.getKey();
try {
stringRedisTemplate.delete(key);
log.info("====延时删去key:{}====",key);
} catch (Exception e) { //失利重试
int count = 1;
for (int i = 1; i <= retryCount; i++) {
if (count <= retryCount) {
log.info("====延时删去key:{},失利重试次数:{}====",key,count);
Boolean r = stringRedisTemplate.delete(key);
if (r) {
break;
} else {
count++;
}
}else
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}, "double-delete-task").start();
}
}
运用延时行列,处理延时双删。最终测验,经过日志的打印能够确认完成了延时删去的功用。
@Override
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 更新数据库
boolean b = this.updateById(documentInfo);
// 再次延时删去缓存
doubleDeleteQueue.add(new DoubleDeleteTask(redisKey,2000L));
return b;
}
最终
在高并发的场景下,运用reids仍是存在许多坑的,稍不留意就会呈现缓存穿透,缓存雪崩等状况,严重的话能够直接形成服务宕机,所以在今后的开发中需求留意(假如项目没啥并发量的话,能够不必考虑)。