分布式锁简记

2017-11-25 08:19:46

需求

工作中接到一个任务,需要接入一个外部快递系统。
外部系统中有门店和订单的概念。 快递是从门店地址寄到收件人手中.
在提交一个快递单时, 如果寄件的门店不存在, 要先创建门店才能下单

方案一: 我们本地系统中也有门店概念,可以在本地系统创建门店时, 同步到外部系统创建门店。
问题: 造成本地系统对外部系统依赖过大

方案二:下单前先查询本地门店缓存(非本地系统门店)是否存在,不存在再查询外部系统中是否已创建门店,如果没有,就需要先创建门店并跟新到本地门店缓存,然后再直接下单。
流程如下:

问题: 如果并发的进程同时查询到门店不存在, 会同时创建门店, 导致失败.

一开始的想法是,使用分布式锁避免重复创建门店。
这个有点类似于java的双重检查。

问题:复杂度过高,性能差

最终并没有使用该方法, 但还是借机了解了分布式锁的基本实现。

分布式锁

接口

public interface DisLock {

    /**
     * 申请加锁
     * @param timeSecond    最长等待时间 超时直接返回false
     * @return
     * @throws InterruptedException
     */
    boolean tryLock(int timeSecond) throws InterruptedException;

    /**
     * 释放锁
     */
    void unLock();
}

public interface DisLockFactory {
    /**
     * 生成一个锁
     * @param lockName
     * @return
     */
    DisLock getLock(String lockName) ;
}

redis

redis官方提出了用Redis实现分布式锁管理器的算法- RedLock, 使用很方便。

gradle引用RedLock java实现包redisson

"org.redisson:redisson:3.5.4"

redisson中提供了RLock接口, 该接口提供了lock方法,可以直接使用

import com.dis.lock.DisLock;
import org.redisson.api.RLock;


import java.util.concurrent.TimeUnit;

public class RedissonLock implements DisLock {

    private RLock lock;

    protected RedissonLock(RLock lock) {
        this.lock = lock;
    }

    @Override
    public boolean tryLock(int timeSecond) throws InterruptedException {
        return lock.tryLock(timeSecond, TimeUnit.SECONDS);
    }

    @Override
    public void unLock() {
        lock.unlock();
    }
}

看看Factory实现:

public class RedissonLockFactory implements DisLockFactory {
    private RedissonClient client;
    public RedissonLockFactory(Config config) {
        client = Redisson.create(config);
    }

    public static RedissonLockFactory simpleFactory() {
        Config config = new Config();
        config.useSingleServer().setAddress("http://127.0.0.1:6379").setDatabase(0);
        return new RedissonLockFactory(config);
    }


    @Override
    public DisLock getLock(String lockName) {
        return new RedissonLock(client.getLock(lockName));
    }
}

redission支持4种连接redis方式,分别为单机、主从、Sentinel、Cluster 集群,

simpleFactory中使用了最简单的单机模式, 连接本机的redis。

参考:
《Redis官方文档》用Redis构建分布式锁_ifeve.com
Redisson分布式锁浅析

zookeeper

Zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制,非常有利于锁的实现。

思路:

实现代码:

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ZkLock implements DisLock{
    private ZooKeeper zk;

    private String lockPath;

    private String lockedNode;

    protected ZkLock(ZooKeeper zk, String lockPath) throws KeeperException, InterruptedException {
        this.zk = zk;
        this.lockPath = lockPath;

        // 创建加锁的路径
        if(zk.exists(lockPath, false) == null) {
            try {
                zk.create(lockPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException ke) {
                if(ke.code() == KeeperException.Code.NODEEXISTS) {

                } else {
                    throw  ke;
                }
            }
        }
    }

    @Override
    public boolean tryLock(int timeSecond) throws InterruptedException {

        String path = null;
        try {
            // 创建一个临时顺序节点
            path = zk.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 节点名
            String lockNode = path.substring(lockPath.length() + 1);

            return acquireLock(lockNode, timeSecond, null);
        } catch (KeeperException e) {
            if(path != null) {
                try {
                    zk.delete(path, -1);
                } catch (KeeperException e1) {
                }
            }

            throw new RuntimeException("try lock fail", e);
        }
    }

    /**
     * 申请加锁
     */
    private boolean acquireLock(String lockNode, int timeSecond, CountDownLatch existLatch) throws KeeperException, InterruptedException {
        // 获取所有的子节点,就是所有的已加锁节点
        List<String> allLock = zk.getChildren(lockPath, false);

        // 排序
        Collections.sort(allLock);
        // 当前请求锁的位置
        int lockIndex = allLock.indexOf(lockNode);


        if(lockIndex < 0) {
            // 出错了
            return false;
        } else if(lockIndex == 0) {     // 当前请求锁节点编号最小, 加锁成功

            lockedNode = lockNode;
            return true;
        } else {    // 当前请求锁节点编号不是最小
            if(timeSecond <= 0) {
                return false;
            }

            // 获取前一个节点
            String preLock = allLock.get(lockIndex - 1);

            // CountDownLatch用于阻塞当前线程, 以等待锁
            // 第一次申请加锁时,要创建CountDownLatch, 再次监听时, 不再创建新的CountDownLatch
            CountDownLatch newLatch = null;
            if(existLatch == null) {
                newLatch = new CountDownLatch(1);
                existLatch = newLatch;
            }

            // 监听前一个节点, 如果前一个节点发生变化(如删除), 当前节点重新获取锁
            watchPath(lockPath + "/" + preLock, lockNode, timeSecond, existLatch);

            // 第一次申请加锁时,要阻塞当前线程
            return newLatch == null ? false : newLatch.await(timeSecond, TimeUnit.SECONDS);
        }
    }

    private void watchPath(String path, String lockNode, int time, CountDownLatch latch) throws InterruptedException, KeeperException {

        zk.exists(path, new Watcher() {
            public void process(WatchedEvent event) {
                // 重新申请加锁
                try {
                    if(acquireLock(lockNode, time, latch)) {
                        // 继续进行线程
                        latch.countDown();
                    }
                } catch (Exception e) {
                    throw new RuntimeException("try lock fail", e);
                }
            }
        });

    }

    @Override
    public void unLock() {
        try {
            // 删除节点
            zk.delete(lockPath + "/" + lockedNode, -1);
        } catch (Exception e) {
            throw  new RuntimeException("delete fail : " + lockPath + "/" + lockedNode, e);
        }
    }
}

factory:

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

public class ZkLockFactory implements DisLockFactory{
    private ZooKeeper zk;

    public static ZkLockFactory simpleFactory() throws IOException {
        return new ZkLockFactory(new ZooKeeper("127.0.0.1:2181", 10000, null));
    }

    public ZkLockFactory(ZooKeeper zk) {
        this.zk = zk;
    }


    @Override
    public DisLock getLock(String lockName)  {
        try {
            return new ZkLock(zk, "/lock/" + lockName);
        } catch (Exception e) {
            throw new RuntimeException("create lock fail", e);
        }
    }
}

测试

public class Work implements Runnable{
    private DisLock lock;
    private String name;
    public Work(DisLock lock) {
        this.lock = lock;

        String s = String.valueOf(System.currentTimeMillis());
        this.name = s.substring(6, s.length());
    }

    @Override
    public void run() {
        try {
            if (lock.tryLock(10)) {
                try {
                    System.out.println(name + " lock success" );

                    System.out.println(name + " start work... ");
                    Thread.sleep(1000);
                    System.out.println(name + " end work...");
                } finally {
                    lock.unLock();

                    System.out.println(name + " unlock success");
                }
            } else {
                System.out.println("lock fail");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


public class LockTest {

    @Test
    public void test() throws IOException {
        // DisLockFactory factory = ZkLockFactory.simpleFactory();
        DisLockFactory factory = RedissonLockFactory.simpleFactory();

        ExecutorService service = Executors.newFixedThreadPool(4);

        for(int i = 0; i < 10; i++) {
            service.execute(new Work(factory.getLock("anyLock")));
        }

        try {
            Thread.sleep(1000 * 100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

参考:
分布式学习(2) —- Zookeeper实现分布式锁
【Zookeeper系列四】ZooKeeper 分布式锁实现

最终实现

由于添加门店时,如果门店已存在,外部系统会返回特定的错误code, 所以这时只需要再读取远程的门店信息, 就可以了,不必要使用分布式锁。

这里只是一个非常不严谨的分布式锁的小栗子, 还有很多问题, 实际上分布式锁要考虑全面非常复杂, 可参考:
基于Redis的分布式锁到底安全吗(上)?
基于Redis的分布式锁到底安全吗(下)?