分布式锁运用场景

跟着互联网运用的高速发展,在电商运用中高并发运用场景涉及许多,例如:

  • 秒杀:在大规模的秒杀场景中,需求确保产品数量、约束用户购买数量, 防止用户购买数量的超限、防止呈现超卖情况;
  • 订单支付: 当用户下单付款时,需求对订单信息进行互斥操作以防止订单重复支付;
  • 提现操作:需求防止用户重复提现,防止造成财政损失。 **总结:**分布式锁运用场景能够分为两类: 1、同享资源的互斥拜访:当多个节点需求对同一个同享资源进行操作时,需求确保同一时刻只有一个节点能够操作,此刻就能够运用分布式锁; 2、分布式使命调度:分布式体系往往需求对使命进行调度,确保使命在多个节点的协作下履行。而在并行的使命履行进程中,需求区别哪些使命已经被分配并且正在被履行,哪些使命没有被分配。利用分布式锁来确保使命的正确性、次序性和稳定性。 概括地说,就是对多线程下,对同享变量操作,线程间是变量不行见,导致呈现并发问题,需求经过分布式锁来进行操控,今日就给我们经过事例,同享一下如何运用redisson完成分布式锁。

事例需求描述

库存中有200件产品,经过产品下单购买场景,运用分布式锁防止产品超卖问题。

Redisson环境预备

本地Redis环境安装

下载地址: github.com/tporadowski… 1、windows下安装 默认端口:6379 下载衔接 github.com/tporadowski… 解压 双击redis-server.exe发动服务端 双击redis-cli.exe发动客户端衔接服务端 在客户端输入 “ping”,呈现“PONG”,即证明衔接成功,部分装备能够在redis.conf文件修正;

Spring boot项目与redis集成

引进依赖

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson-spring-boot-starter</artifactId>
	<version>3.22.0</version>
</dependency>

创建redis衔接池代码

package com.zhc.config.redis;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
 * redisson 衔接池装备
 * @author zhouhengchao
 * @since 2023-06-19 20:29:00
 * @version 1.0
 */
@Configuration
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.database}")
    private Integer dataBase;
    @Bean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = serializer();
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        // key选用String的序列化办法
        redisTemplate.setKeySerializer(StringRedisSerializer.UTF_8);
        // value序列化办法选用jackson
        redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
        // hash的key也选用String的序列化办法
        redisTemplate.setHashKeySerializer(StringRedisSerializer.UTF_8);
        //hash的value序列化办法选用jackson
        redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }
    /**
     * 此办法不能用@Ben注解,防止替换Spring容器中的同类型目标
     */
    public GenericJackson2JsonRedisSerializer serializer() {
        return new GenericJackson2JsonRedisSerializer();
    }
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(dataBase);
        // 设置redisson序列化办法,不然打开检查数据或许乱码
        Codec codec = new JsonJacksonCodec();
        config.setCodec(codec);
        return Redisson.create(config);
    }
}
redis的yaml文件装备
spring:
  redis:
    host: localhost
    port: 6379
    database: 0

扣减库存办法

/**
     * 从redis中获取库存,扣减库存数量
     */
    private void reduceStock(){
        // 从redis中获取产品库存
        RBucket<Integer> bucket = redissonClient.getBucket(REDIS_STOCK);
        int stock = bucket.get();
        if (stock > 0) {
            // 库存-1
            stock--;
            // 更新库存
            bucket.set(stock, 2, TimeUnit.DAYS);
            log.info("扣减成功,剩余库存:" + stock);
        } else {
            log.info("扣减失利,库存不足");
        }
    }

基于synchronized加锁操控

@GetMapping("/test01")
    public void test01(){
        for (int i = 0; i < 6; i++) {
            synchronized (this) {
                new Thread(this::reduceStock).start();
            }
        }
    }

我们经过了Synchronized锁,成功处理了多个线程争抢导致的超卖问题,可是有个问题,假定后期公司为了确保服务可用性。

将单击的运用,晋级称为了集群的形式,那么是否会有超卖问题呢?

经过nginx建立负载均衡

下载Nginx:​​nginx.org/download/ng… nginx.conf完好装备

worker_processes  1;
events {
    worker_connections  1024;
}
http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;
	upstream redislock{
    server localhost:8081 weight=1;
    server localhost:8082 weight=1;
}
    server {
        listen       80;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
			proxy_pass http://redislock;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

发动nginx,双击nginx.exe文件即可; 拜访运用:http://localhost/test01 发现存在超卖问题。

运用redis分布式锁

@GetMapping("/test02")
    public void test02(){
        // 分布式锁名称,关键是多个运用要同享这一个Redis的key
        String lockKey = "lockDeductStock";
        // setIfAbsent 假如不存在key就set值,并回来1
        //假如存在(不为空)不进行操作,并回来0,与redis命令setnx相似,setIfAbsent是java中的办法
        // 依据回来值为1就表明获取分布式锁成功,回来0就表明获取锁失利
        Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
        // 加锁不成功,回来给前端错误码,前端给用户友好提示
        if (Boolean.FALSE.equals(lockResult)) {
            log.info("体系繁忙,请稍后再试!");
            return;
        }
        reduceStock();
        // 事务履行完成,删去这个锁
        redisTemplate.delete(lockKey);
    }

1、主要运用setIfAbsent办法:假如不包括key就set值,并回来1; 假如存在(不为空)不进行操作,并回来0; 2、很明显,比get和set要好。因为先判断get,再set的用法,有或许会重复set值,与setnx相似。 以上redis加锁能够处理并发问题,可是存在问题: 1、假如setIfAbsent加锁成功,可是到事务逻辑代码时,该服务挂掉了,就会导致另一个服务一向获取不到锁,一向在等候中; 2、能够运用 redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey,30,TimeUnit.SECONDS),设置锁的key过期时刻,在规定时刻后key过期就能够再获取。

redis分布式锁优化

以上分布式锁还是存在问题,假如锁的key过期时刻与程序履行时刻差问题,例如

  • 假如锁key在程序履行完毕前过期,就会导致删去key失利;
  • 一起另一个运用获取了锁,又会被其他运用删掉锁,导致锁一向失效,存在并发问题。 能够经过引进UUId来处理锁被其他运用勿开释问题,如下代码:
@GetMapping("/test03")
    public void test03(){
        // 分布式锁名称,关键是多个运用要同享这一个Redis的key
        String lockKey = "lockDeductStock";
        // 分布式锁的值
        String lockValue = UUID.randomUUID().toString().replaceAll("-", 		"");
        // setIfAbsent 假如不存在key就set值,并回来1
        //假如存在(不为空)不进行操作,并回来0,与redis命令setnx相似,setIfAbsent是java中的办法
        // 依据回来值为1就表明获取分布式锁成功,回来0就表明获取锁失利
        Boolean lockResult = 		redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
        // 加锁不成功,回来给前端错误码,前端给用户友好提示
        if (Boolean.FALSE.equals(lockResult)) {
            log.info("体系繁忙,请稍后再试!");
            return ;
        }
        reduceStock();
        // 判断是不是当时恳求的UUID,假如是则能够正常开释锁。假如不是,则开释锁失利!
        if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) 		{
            redisTemplate.delete(lockKey);
        }
    }

还存在锁超时问题:锁超时问题,写一个定时使命,分线程每隔十秒去检查一次主线程是否持有这把锁,假如这个锁存在,从头将这个锁的超时时刻设置为30S,对锁延时,比较复杂。

运用redisson完成分布式锁

@GetMapping("/test04")
    public void test04(){
        // 分布式锁名称,关键是多个运用要同享这一个Redis的key
        String lockKey = "lockDeductStock";
        // 获取锁目标
        RLock redissonLock = redissonClient.getLock(lockKey);
        try {
            redissonLock.lock();
//            boolean result = redissonLock.tryLock();
            // 加锁不成功,回来给前端错误码,前端给用户友好提示
//            if (!result) {
//                log.info("体系繁忙,请稍后再试!");
//                return;
//            }
            reduceStock();
        }
        finally{
            if(redissonLock.isHeldByCurrentThread()){
                redissonLock.unlock();
            }
        }
    }

redisson分布式锁原理图:

【Java项目】高并发场景下,基于Redisson实现的分布式锁

【Java项目】高并发场景下,基于Redisson实现的分布式锁
关键办法介绍:

  • lock() 办法是堵塞获取锁的办法,假如当时锁被其他线程持有,则当时线程会一向堵塞等候获取锁,直到获取到锁或者发生超时或中断等情况才会完毕等候;
  • tryLock() 办法是一种非堵塞获取锁的办法,在测验获取锁时不会堵塞当时线程,而是立即回来获取锁的结果,假如获取成功则回来 true,不然回来 false. 总结:
  • lock()办法获取到锁之后能够确保线程对同享资源的拜访是互斥的,适用于需求确保同享资源只能被一个线程拜访的场景。Redisson 的 lock() 办法支撑可重入锁和公平锁等特性,能够更好地满意多线程并发拜访的需求;
  • tryLock() 办法支撑加锁时刻约束、等候时刻约束以及可重入等特性,能够更好地操控获取锁的进程和等候时刻,防止程序呈现长时刻无法呼应等问题。 在实践运用中需求依据详细场景和事务需求来选择适宜的办法,以确保程序的正确性和高效性。 视频中的内容假如对您有所协助,请给个三连加关注的支撑,欢迎在谈论区留言讨论,后续会进一步完善文档。