【分布式锁】---简单明了


介绍分布式锁

单机多线程: 在传统的单机服务中,涉及到并发相关的场景,通常使用的 ReetrantLock 类、synchronized 关键字来控制并发,或者是在特殊的场景的,可能会使用Semaphore、CountDownLatch、ReadWriteLock等锁来控制并发访问。

分布式系统: 在分布式系统,如果继续使用传统的锁,不可以满足自己的想要的结果,因为不同的服务/客户端通常运行在独立的 各自的JVM 进程上,多个 JVM 进程共享同一份资源的话,此时就考虑分布式锁。

在一个最基本的分布式锁需要同时满足:

  • 互斥性:在任何时刻,对于同一条数据,只有一台应用可以获取到分布式锁;
  • 高可用性:在分布式场景下,一小部分服务器宕机不影响正常使用,这种情况就需要将提供分布式锁的服务以集群的方式部署;
  • 防止锁超时:如果客户端没有主动释放锁,服务器会在一段时间之后自动释放锁,防止客户端宕机或者网络不可达时产生死锁;
  • 独占性:加锁解锁必须由同一台服务器进行,也就是锁的持有者才可以释放锁,不能出现你加的锁,别人给你解锁了。

分布式锁的常用实现方式

  1. 基于 redis 缓存实现分布式锁
  2. 基于 zookeeper 实现分布式锁
  3. 基于 mysql 实现分布式锁

三种方式的进行比对:

考虑在性能方面: redis > zookeeper > 数据库
考虑复杂性或者难度: Zookeeper > redis > 数据库
考虑可靠性: Zookeeper > redis = 数据库

一、 redis-分布式锁

1、redis

客户端借助 redis 中的命令 setnx(key value) 该命令作用在创建的值会进行判断,如果该值存在则忽略,不存在则创建。运用到分布式中,多个客户端发送setnx命令,只用一个客户端可以执行成功

在这里插入图片描述
存在问题:

  • 死锁 : 当一个客户端设置成功后,宕机无法释放key,则存在死锁问题;
  • 误删 : 因为锁得名称是一样,就会存在一个客户端执行完,刚好自己设置锁到期了,仍要执行delete操作,此时就会把其他客户端的锁删除掉

1.1、处理死锁

在使用redis客户端 setnx 值的同时可额外添加参数

  • 参数一:key
  • 参数二:value
  • 参数三:过期时间
  • 参数四::时间单位

此时我们可以原子性的在设置的同时,设置值的过期时间的来防止死锁的问题
在这里插入图片描述
在这里插入图片描述

1.2、处理误删

存在原因: 因为锁得名称是一样,就会存在一个客户端执行完,刚好自己设置锁到期了,仍要执行delete操作,此时就会把其他客户端的锁删除掉。
在这里插入图片描述

处理办法: setnx设置值获取锁的同时,设置的一个当前线程的唯一的值,比如uuid,在释放锁的之前,先进行判断是否释放的是自己的锁。但是存在新的问题,判断和删除锁,无法同时进行,因为redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。总的来说,中间可能存在问题。
只能借助与 lua脚本 进行实现。本文章不介绍,语法相对而言,比较容易理解
在redis中需要通过eval命令执行lua脚本。

  • 参数一:lua脚本语句
  • 参数二:值得数量
  • 参数key: 获取使用 KEYS[1]
  • 参数arg: 获取使用 ARGV[1]

EVAL script numkeys key [key …] arg [arg …]

EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 输出:10 20 60 70

在代码中使用 redis.call() 来执行lua脚本。

        //脚本解读:
            //获取当前锁:KEYS[1] 是否等于ARGV[1]
            //成立,则进行删除 key为 KEYS[1] 得值(锁)
            //反之 返回0
            //脚本if语句的结束end
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                "then return redis.call('del', KEYS[1]) " +
                "else return 0 " +
                "end";
        //参数一:lua脚本的,执行结束返回是boolean类型
        //参数二:锁的key,用来比对的key为lock的值是否等于 uuid
        redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList("Lock"),uuid);

2、可重入锁

以上功能实现的是基于redis提供的 SETNX ,一旦键存在就无法再设置成功,无法实现在单机服务中重入锁的功能,一个客户端可以重复获取锁来执行代码逻辑。
解决办法,利用 redis 提供给数据类型 hash(哈希表) 来进行实现可重入功能,通过设置redis hash 存储的重入次数,利用 lua脚本进行实现。
###2.1、加锁代码:

        //脚本解读
            //判断hash是否存在,或者hash值是否等1
            //成立则,进行的值+1,同时更新的过期时间
        String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        //参数一:lua脚本
        //参数二:锁名称
        //参数key:hash的值
        //参数ary:重新设置的过期时间的单位
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){
            Thread.sleep(50);
        }

###2.2、解锁代码:

        //脚本解读
            //判断hash是否存在,不存在得返回nil
            //判断减1操作后的,值是否等于0,成立则直接删除hash
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());
        if (flag == null){
            System.out.println("这把锁不是你,乱搞");
        }

3、阶段总结

3.1、升级阶段

  1. 独占排他使用 SETNX
  2. 防死锁发生 : 如果reids客户端从redis获取到锁之后立马宕机
    处理办法:原子性操作,在设置值的同时,设置过期时间
  3. 防误删 : 删除不是自己锁,导致其他客户端的异常
    处理办法:先判断是否自己的锁,才能删除,使用 lua脚本进行实现
  4. 可重入锁:hash + lua脚本
  5. 自动续期: 定时器 + lua脚本

3.2、加锁操作

  1. setnx:存在问题 : 独占排他、死锁、不可重入、原子性
  2. set key value ex 30 nx : 存在问题 : 独占排他、死锁 不可重入
  3. hash + lua脚本 : 可重入锁
    判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby),并设置过期时间(expire)
    如果锁占用 ,则判断是否当前线程占用,如果是则重入(hexests)(hincrby)并重置过期时间(expire)
    否则获取锁失败,返回标识
  4. 定时器 + lua脚本: 实现锁的自动续期
    判断当前锁的,是否是自己的锁(hexists == 1),如果是自己的锁,expire重置过期时间

3.3、解锁操作

  1. delete : 可以能够造成误删其他客户端的锁
  2. 先判断在删除同时保证原子性:lua脚本
  3. hash + lua 脚本:可重入
    判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常
    存在则直接减1(hincrby-1),判断减1后是否为0,为0则释放锁(del)并返回1
    不为0,则返回0

重试:递归 循环

3、redLock(红锁)

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 应用程序使用相同的kv值,从多个redis实力中获取锁,在获取的同时设置有超时间,如果当前节点超过一定的超时时间依然没有获取到锁,则直接放弃,尽快尝试从下一个健康的redis节点获取锁,以避免被一个宕机的节点阻塞
  3. 计算获取的锁的时间 = 客户端程序的系统当前时间 - step1中的时间。获取锁的消耗时间小于总的锁定时间(30s)并且半数以上节点获取锁成功,认为获取锁成功
  4. 计算剩余锁定时间 = 总的锁定时间 - 第三步的消耗时间
  5. 如果获取锁失败了,针对所有节点进行释放锁操作

文章暂不重点介绍红锁的实现步骤,知道起作用足以,在开发环境很少使用到的红锁。

4、Redisson

官方文档地址:https://github.com/redisson/redisson/wiki
​ Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务…
详细看官方文档,文章重点使用到的 redisson 中提供的分布式锁

4.1、配置

    @Bean
    public RedissonClient redissonConfig(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.200.250:6379");  //redis 服务地址
//                .setDatabase(0)                                             //指定redis数据库编号
//                .setUsername("").setPassword("")                            //redis用户名、密码
//                .setConnectionMinimumIdleSize(10)                           //线程池最小空闲连接数
//                .setConnectionPoolSize(50)                                 //线程池最大连接数
//                .setIdleConnectionTimeout(60000)                           //线程超时时间
//                .setConnectTimeout(6000)                                   //客户端程序获取redis连接的超时时间
//                .setTimeout(6000);                                         //响应超时时间
        return Redisson.create(config);
    }

4.2、可重入锁

基于Redis的Redisson分布式可重入锁RLock 类 实现了 java.util.concurrent.locks.Lock 接口。
如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

代码层使用

    @Autowired
    private RedissonClient redissonClient;

    @Test
    void redisClient() {
        // 加锁,获取锁失败重试
        RLock lock = redissonClient.getLock("lock");
        lock.lock();	//可选参数来指定加锁的时间。超过这个时间后锁便自动解开了。
        
        //代码一系列操作........
        
        // 释放锁
        lock.unlock();

    }

4.3、公平锁

类似 ReentrantLock 机制,可实现公平和非公平的 锁的设置
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待一段时间后才能执行后面线程,以此类推

    @Test
    void redisClient() {
        // 加锁,获取锁失败重试
        RLock failLock = redissonClient.getFairLock("failLock");
        failLock.lock();
        
        //代码一系列操作........

        // 释放锁
        failLock.unlock();

    }

4.4、连锁、红锁

连锁 - 基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();

 //代码一系列操作........
 
lock.unlock();

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();

 //代码一系列操作........
 
lock.unlock();

4.5、读写锁

分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
几种场景:

  • A 和 B 同时进行写操作,A抢到锁开始写操作,B需要等待A结束
  • A 和 B 同时进行读操作,无需等待,直接读
  • A 和 B 前写后读,读需要等待写操作后,才能进行读操作
  • A 和 B 前读后写,写需要等待读操作后,才能进行写操作
        RReadWriteLock rReadWriteLock = redissonClient.getReadWriteLock("lock");
        
        //读加锁
        rReadWriteLock.readLock().lock();
        //读解锁
        rReadWriteLock.readLock().unlock();

        //写加锁
        rReadWriteLock.writeLock().lock();
        //写解锁
        rReadWriteLock.writeLock().unlock();

4.6、semaphore(信号量)

限制同时访问的线程数量:通过指定许可证数量来控制同一时间内可以访问某个资源的线程数量。

        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.trySetPermits(3);
        //执行1次
        semaphore.acquire();
        //当 acquire 执行3次后,才能进行加锁
        semaphore.release();

举一个简单例子,有一个的5车位的停车场,在场地未满之前,任何车辆都可以停用,一旦五个车位被占满的时候,停车场大门关闭,场外车辆无法再次进入,只有等到的场地内的车辆的出来一辆,大门打开,而且只能进来的一辆车,进来后大门再次关闭。以此类推!!

4.7、CountDownLatch(闭锁)

使一个或多个线程等待其他线程完成某个操作后再继续执行。

        RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");

        countDownLatch.trySetCount(3);
        //执行1次
        countDownLatch.await();
        //当 await 执行3次后,才能进行加锁
        countDownLatch.countDown();

举一个简单例子,路标小摊,买烤冷面,就买5份,买完手工回家,手工等于加锁

二、zookeeper-分布式锁

Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。并且每个节点都是唯一的
zookeeper中提供的节点有四种类型,区分: 永久/临时 和 序列化/非序列化

  1. PERSISTENT: 永久节点,一旦创建成功,出现断开连接情况,节点仍存在;
    创建方式: create /lock “Hello Zookeeper”
  2. EPHEMERAL: 临界节点,创建成功后,出现断开连接情况,节点将会消失;
    创建方式: create -e /lock “Hello Zookeeper”
  3. PERSISTENT_SEQUENTIAL: 永久序列化节点,断开连接仍在,并在创建节点的同时进行顺序编号;
    创建方式: create -s /lock “Hello Zookeeper”
  4. EPHEMERAL_SEQUENTIAL: 临时序列化节点,断开连接消息,并在创建节点的同时进行顺序编号;
    创建方式: create -s -e /lock “Hello Zookeeper”

在Zookeeper分布式锁,同样也使用到了事件的监听,类型Vue中的watch中的监听函数的,但是在的zookeeper中的使用的关键字是 stat 。

  • 节点创建:stat -w /xx ---->>>> 当监听的/xxx触发时—NodeCreated
  • 节点删除:stat -w /xx ---->>>> 当监听/xx节点删除时—NodeDelete
  • 节点数据修改:get -w /xx ---->>>> 当监听/xx节点数据发生变化时—NodeDataChanged
  • 子节点变更:ls -w /xx ---->>>> 当监听/xx节点的子节点创建或者删除时–NodeChildChanged

同样Zookeeper也提供了:原生客户端、ZkClient、Curator框架

代码不再进行编写,类似reids 客户端,进行命令操作客户端的提供的接口,实现zookeeper的特性,

2.1、实现思路

  1. 获取锁:多个客户端争抢去创建zookeeper节点,创建成功则获取锁
  2. 释放锁:delete节点即可。
  3. 如果没有抢到锁,进行循环重试

同样需要考虑redis中的出现的各种问题,防死锁、重入问题、防误删、原子性等

2.2、相关问题

2.2.1、防死锁

可以利用的zookeeper提供的 临时节点 的特性,当节点的创建成功后,一定那出现的客户点宕机,过一段时间后zookeeper没有收到客户端的心跳则会判断该端会话失效,从而将临时节点进删除。

2.2.2、阻塞锁

采用临时序列化节点

  • 所有请求要求获取锁的同时,给每一个请求创建一个临时序列化节点,
  • 获取到当前节点的前一个节点,如果前一个节点为空的话,则说明该节点最小,则有限获取锁,否则利用监听机制监听前一个节点的变化
  • 获取锁成功执行业务操作后,释放当前的节点,此时后一个节点正在监听该节点,则后一个节点开始执行他的相关业务操作
    这样操作的同时,同样也实现了公平锁的特征

2.2.3、可重入

  1. 可以利用在创建节点的同时,利用节点内容存储 标志数据,比如 服务器信息、线程信息、以及重入次数的信息
  2. 借助ThreadLocal,线程的局部,隔离线程间的影响,线程私有记录相关信息

2.3、Curator

Curator简化了Zookeeper客户端的开发量。

通过查看官方文档,可以发现Curator主要解决了三类问题:

  1. 封装ZooKeeper client与ZooKeeper server之间的连接处理
  2. 提供了一套Fluent风格的操作API
  3. 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
    Curator由一系列的模块构成
    常用的是curator-framework和curator-recipes:
  • curator-framework:提供了常见的zk相关的底层操作
  • curator-recipes:提供了一些zk的典型使用场景的参考。本节重点关注的分布式锁就是该包提供的
@Configuration
public class CuratorConfig {
    @Bean
    public CuratorFramework curatorFramework(){
        // 重试策略,这里使用的是指数补偿重试策略,重试3次,初始重试间隔1000ms,每次重试之后重试间隔递增。
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        // 初始化Curator客户端:指定链接信息 及 重试策略
        CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.116.100:2181", retry);
        client.start(); // 开始链接,如果不调用该方法,很多方法无法工作
        return client;
    }
}

2.3.1、可重入锁InterProcessMutex

Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。

@Autowired
private CuratorFramework curatorFramework;

public void checkAndLock() {
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/locks/lock");
    try {
        // 加锁
        mutex.acquire();
		//业务代码逻辑。。。。。。

        // 释放锁
        mutex.release();
        
    } catch (Exception e) {
        e.printStackTrace();
    }
}

2.3.2、不可重入锁InterProcessSemaphoreMutex

具体实现:InterProcessSemaphoreMutex。与InterProcessMutex调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。

@Autowired
private CuratorFramework curatorFramework;

public void deduct() {

InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, "/locks/lock");
    try {
    	//加锁
        mutex.acquire();
        //业务代码逻辑。。。。。。
        
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        	//解锁
            mutex.release();
    }
}

2.3.3、可重入读写锁InterProcessReadWriteLock

类似JDK的ReentrantReadWriteLock。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁。从读锁升级成写锁是不成的。主要实现类InterProcessReadWriteLock:

// 构造方法
InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/locks/rwlock");

// 获取读锁对象
InterProcessMutex readLock();


// 获取写锁对象
InterProcessMutex writeLock();

2.3.4、 联锁InterProcessMultiLock

Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。实现类InterProcessMultiLock

// 构造函数需要包含的锁的集合,或者一组ZooKeeper的path
public InterProcessMultiLock(List<InterProcessLock> locks);
public InterProcessMultiLock(CuratorFramework client, List<String> paths);

// 获取锁
public void acquire();
public boolean acquire(long time, TimeUnit unit);

// 释放锁
public synchronized void release();

2.3.5、信号量InterProcessSemaphoreV2

一个计数的信号量类似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。主要实现类InterProcessSemaphoreV2:

// 构造方法
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);

// 注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。
// 同时还提供了超时的重载方法
public Lease acquire();
public Collection<Lease> acquire(int qty);
public Lease acquire(long time, TimeUnit unit);
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

// 租约还可以通过下面的方式返还
public void returnAll(Collection<Lease> leases);
public void returnLease(Lease lease);

三、mysql-分布式锁

mysql实现分布式锁主要是利用了唯一键索引不能重复插入的特点实现。创建一个表后

实现思路:

  1. 多个客户端执行insert语句,只有一个客户端插入功能,表示获取到锁,进行相关逻辑代码操作
  2. 当业务逻辑执行完毕后,delelte删除该条记录,即为释放锁
  3. 重试,递归

解决死锁,重入,误删、重入等相关问题
给表中添加对应的字段,每次进行操作的都会进行比对,成功后,则开始执行相关业务逻辑代码