我们都知道,在单机环境下存在对共享资源的竞争时,需要通过“锁”机制来控制并发,避免多个线程同时修改同一个数据,进而造成数据不一致的情况,最经典莫过于转账问题,读者可以自行脑补。很多语言都提供所机制,比如Java中的 synchronized 关键字 和 ReentrantLock 类,互斥的资源在统一进程内,jvm 本身提供对锁的管理。但在分布式环境下,同样的服务分布在不同的主机,甚至是不同网络,单机的锁机制做到跨主机的协同,所以就需要通过架构来实现,常用的实现分布式锁有三种方式:

基于数据库唯一索引方式

利用数据库的唯一索引的排他机制,resource_key代表被加锁的资源标识,实现分布式锁。比如下面这张表:

CREATE TABLE `resource_lock` (
  `id` bigint(20) NOT NULL COMMENT '自增主键',
  `resource_key` varchar(255) NOT NULL COMMENT '锁名称',
  `gmt_created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `resource_key` (`resource_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='基于数据表的资源锁';

由于 resource_key 使用的是唯一索引,如果有多个请求同时要加锁时,数据库会保证只有一条加锁请求执行成功,从而实现了锁的效果,其余请求可以通过自旋的方式等待,知道锁释放为止:

    /**
     * 加锁
     * @param resourceKey
     * @throws InterruptedException 
     */
    public void lock(String resourceKey) throws InterruptedException {
        Connection conn = this.getConn();

        int count = 0;
        while(true) {
            try {
                count = conn.prepareStatement(
                        "INSERT INTO `resource_lock` (`id`, `resource_key`, `gmt_created`) VALUES (NULL, '" + resourceKey
                        + "', CURRENT_TIMESTAMP);")
                        .executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            if(count > 0) {
                return;
            }
            Thread.sleep(100);
        }
    }

解锁方式参考:

    /**
     * 解锁
     * @param resourceKey
     * @return
     */
    public boolean unlock(String resourceKey) {
        Connection conn = this.getConn();
        try {
            return conn.prepareStatement(
                    "DELETE FROM `resource_lock` WHERE  `resource_key` = '"+resourceKey+"'")
                    .executeUpdate() > 0;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return true;
    }

另外,还可以通过数据库的排他锁的机制实现分布式锁,在MYSQL中,排他锁可以在SELECT语句中加 FOR UPDATE关键字来实现,多条请求同时执行时只有一个请求可以获得排它锁,其他请求会被阻塞。代码例如:

    /**
     * 加锁
     */
    public boolean lock(String resourceKey) throws SQLException {
        Connection conn = this.getConn();
        conn.setAutoCommit(false);
        while(true) {
            try {
                ResultSet rs = conn
                        .prepareStatement(
                                "SELECT * FROM `resource_lock` WHERE resource_key = '" + resourceKey + "' FOR UPDATE;")
                        .executeQuery();
                if(rs != null) {
                    return true;
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

解锁代码:

    public void unlock(String resourceKey) throws SQLException {

        Connectionconn = this.getConn();

        conn.commit();

    }

​ 需要注意的是,MySql中行级别锁是基于索引的,否则使用表级锁,在高并发时行锁和表锁有明显的性能差异,所以再不考虑MySql的查询优化的情况下,在使用排他锁时需要将为查询字段添加索引,就像上例中的 resource_key。

利用缓存使用分布式锁

​ 通过缓存来实现分布式锁是比较常用的做法,这种方式的原始是通过缓存中间件某些原子操作特性,按照通过逻辑判断进行加锁判断。比如在Redis中,可以通过 SETNX(Set if not exits 的缩写) 指令来实现分布式锁,改指令表示如果值不存在则设置并且返回true,如果值已经存在则返回false,由于指令原子性,多条请求并发执行时,只有一条执行会执行成功,设置成功这表示加锁成功,通过缓存过期指令来释放锁。示例代码如下:

加锁:

    /**
     * 加锁
     */
    public boolean lock(String resourceKey) throws InterruptedException {
        while (true) {
            if (redis.setnx(resourceKey, "sth..") > 0) {
                return true;
            }
            Thread.sleep(50);
        }
    }

解锁:

    /**
     * 解锁
     */
    public booleanunlock(String resourceKey) {
        return redis.del(resourceKey) > 0;
    }

(补充redis,setx的公平锁的问题,避免手慢的永远拿不到锁)

另外,除了redis,memcached 缓存应用页非常广泛,其中的add指令有类似功能,也可以用来表示实现分布式锁,该指令表示如果key存在则添加成功并返回true,否则添加失败,返回false.

​ 还需要主要的是,文中的几个例子只是最简单的分布式锁的形式,仅仅展示了各种分布式锁的核心原理,实际在生产环境中还要面对问题具体问题,锁的公平性(TODO?)、链接超时、缓存服务器宕机、锁的等待时间、执行效率、死锁、重复释放等,需要的问题上综合权衡。

(放前面)比在在redis的实现中,可以在 setnx 之后设置通过expire指令设置超时时间(多条reids指令可以mulit/exec 指令实现原子性),以及自旋次数,避免由于解锁执行异常造成的死锁情况。

Zookeeper 提供了比较完善的分布式锁的机制:

Zookeeper 的第三方客户端 Curator 中已经封装好了分布式锁的实现类,InterProcessMutex,可以非常方便的加解锁,示例代码如下:

创建锁:

    public static final String ZNODE_PATH_FOR_LOCK = "/lock_for_user_submit";
    public static CuratorFramework zookeeperClient;

    private InterProcessMutex lock;

    public TestZookeeperLock() {
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
        org.apache.curator.connection.ConnectionHandlingPolicy a = null;
        zookeeperClient = CuratorFrameworkFactory.builder().retryPolicy(retry).connectString("192.168.56.101:2181").build();
        zookeeperClient.start();

        //创建分布式锁,同一个Znode路径表示同一个锁
        this.lock = new InterProcessMutex(zookeeperClient, ZNODE_PATH_FOR_LOCK);
    }

加锁过程:

    public boolean lock() {
        try {
            // 10000ms 为等待时间,该方法在返回之前会阻塞线程,成功获得锁之后会返回 true,否则返回 false
            return this.lock.acquire(10000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

解锁方法:

    public void unlock() {
        try {
            if (this.lock.isAcquiredInThisProcess()) {
                this.lock.release();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

zookeeper的分布式锁使用相同的路径,代表相同的锁,比如上例子中的 /lock_for_user_submit。其家加解锁的过程如下:

  1. 每词取锁请求会在指定的锁节点下创建一个新的有序节点,登录zookeepr之后可以查看已经创建的有序子节点:

    [zk: localhost:2181(CONNECTED) 28] ls /lock_for_user_submit
    
    [_c_df74c204-80fa-45ff-9015-0033a4c62cdf-lock-0000000024,\
    
    c1cd1c9e9-ea4d-443b-b0d8-95458b0c0738-lock-0000000025, \
    
    c2522d2ae-57a7-43fb-ba10-773cfa962b09-lock-0000000026]
    
  1. 判断是否获得锁时,会获得锁节点(例子中的/lock_for_user_submit)下的所有子节点,如果当前线程创建的子节点是序号最小的子节点,则表示已经获得锁。否则,阻塞线程,并监听最小接节点变化。

  2. 当释放锁锁时,会删除锁节点下对应的序号最小子节点,并通知其余处于监听状态的线程重新尝试获取锁。

(补充Curator的Lock其他用法:参考:http://www.aboutyun.com/thread-10725-1-1.html)

总结一下:

基于数据库的分布式锁比较容易实现,而不需要维护额外的中间件,但存在单点问题,数据库宕机会拖累业务,而且频繁数据库操作性能不佳, 加锁期间或占用数据路链接,影响其他数据库操作。基于缓存的方式性能更好,实现起来门槛也并不高,实际生产环境中的缓存也非常广泛。

Zookeeper分布式锁可靠性更高,Zookeeper集群具有多级冗余的灾备能力,避免单点问题,有序节点可以保证过个请求获得锁的公平性。缺点是锁节点的创建、删除和节点监听会有一定额性能开销,从效率和维护成本上略重。

总体而言,有限选择基于缓存的方式,在实际生产环境下,多数产品都会使用缓存,随意具备一定的环境基础。如果是小规模的临时需求,手头有没有可用的缓存服务器,可以用基于数据表的方式来实现。如果你想追求更高的可靠性,且有精力维护一个zookeeper集群,可以使用zookeeper方案。

results matching ""

    No results matching ""