我们都知道,在单机环境下存在对共享资源的竞争时,需要通过“锁”机制来控制并发,避免多个线程同时修改同一个数据,进而造成数据不一致的情况,最经典莫过于转账问题,读者可以自行脑补。很多语言都提供所机制,比如Java中的 synchronized 关键字 和 ReentrantLock 类,互斥的资源在统一进程内,jvm 本身提供对锁的管理。但在分布式环境下,同样的服务分布在不同的主机,甚至是不同网络,单机的锁机制做到跨主机的协同,所以就需要通过架构来实现,常用的实现分布式锁有三种方式:
基于数据库唯一索引方式
利用数据库的唯一索引的排他机制,resource_key代表被加锁的资源标识,实现分布式锁。比如下面这张表:
CREATE TABLE `resource_lock` (
`id` bigint(20) NOT NULL COMMENT '自增主键',
`resource_key` varchar(255) NOT NULL COMMENT '锁名称',
`gmt_created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `resource_key` (`resource_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='基于数据表的资源锁';
由于 resource_key 使用的是唯一索引,如果有多个请求同时要加锁时,数据库会保证只有一条加锁请求执行成功,从而实现了锁的效果,其余请求可以通过自旋的方式等待,知道锁释放为止:
/**
* 加锁
* @param resourceKey
* @throws InterruptedException
*/
public void lock(String resourceKey) throws InterruptedException {
Connection conn = this.getConn();
int count = 0;
while(true) {
try {
count = conn.prepareStatement(
"INSERT INTO `resource_lock` (`id`, `resource_key`, `gmt_created`) VALUES (NULL, '" + resourceKey
+ "', CURRENT_TIMESTAMP);")
.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
if(count > 0) {
return;
}
Thread.sleep(100);
}
}
解锁方式参考:
/**
* 解锁
* @param resourceKey
* @return
*/
public boolean unlock(String resourceKey) {
Connection conn = this.getConn();
try {
return conn.prepareStatement(
"DELETE FROM `resource_lock` WHERE `resource_key` = '"+resourceKey+"'")
.executeUpdate() > 0;
} catch (SQLException e) {
e.printStackTrace();
}
return true;
}
另外,还可以通过数据库的排他锁的机制实现分布式锁,在MYSQL中,排他锁可以在SELECT语句中加 FOR UPDATE关键字来实现,多条请求同时执行时只有一个请求可以获得排它锁,其他请求会被阻塞。代码例如:
/**
* 加锁
*/
public boolean lock(String resourceKey) throws SQLException {
Connection conn = this.getConn();
conn.setAutoCommit(false);
while(true) {
try {
ResultSet rs = conn
.prepareStatement(
"SELECT * FROM `resource_lock` WHERE resource_key = '" + resourceKey + "' FOR UPDATE;")
.executeQuery();
if(rs != null) {
return true;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
解锁代码:
public void unlock(String resourceKey) throws SQLException {
Connectionconn = this.getConn();
conn.commit();
}
需要注意的是,MySql中行级别锁是基于索引的,否则使用表级锁,在高并发时行锁和表锁有明显的性能差异,所以再不考虑MySql的查询优化的情况下,在使用排他锁时需要将为查询字段添加索引,就像上例中的 resource_key。
利用缓存使用分布式锁
通过缓存来实现分布式锁是比较常用的做法,这种方式的原始是通过缓存中间件某些原子操作特性,按照通过逻辑判断进行加锁判断。比如在Redis中,可以通过 SETNX(Set if not exits 的缩写) 指令来实现分布式锁,改指令表示如果值不存在则设置并且返回true,如果值已经存在则返回false,由于指令原子性,多条请求并发执行时,只有一条执行会执行成功,设置成功这表示加锁成功,通过缓存过期指令来释放锁。示例代码如下:
加锁:
/**
* 加锁
*/
public boolean lock(String resourceKey) throws InterruptedException {
while (true) {
if (redis.setnx(resourceKey, "sth..") > 0) {
return true;
}
Thread.sleep(50);
}
}
解锁:
/**
* 解锁
*/
public booleanunlock(String resourceKey) {
return redis.del(resourceKey) > 0;
}
(补充redis,setx的公平锁的问题,避免手慢的永远拿不到锁)
另外,除了redis,memcached 缓存应用页非常广泛,其中的add指令有类似功能,也可以用来表示实现分布式锁,该指令表示如果key存在则添加成功并返回true,否则添加失败,返回false.
还需要主要的是,文中的几个例子只是最简单的分布式锁的形式,仅仅展示了各种分布式锁的核心原理,实际在生产环境中还要面对问题具体问题,锁的公平性(TODO?)、链接超时、缓存服务器宕机、锁的等待时间、执行效率、死锁、重复释放等,需要的问题上综合权衡。
(放前面)比在在redis的实现中,可以在 setnx 之后设置通过expire指令设置超时时间(多条reids指令可以mulit/exec 指令实现原子性),以及自旋次数,避免由于解锁执行异常造成的死锁情况。
Zookeeper 提供了比较完善的分布式锁的机制:
Zookeeper 的第三方客户端 Curator 中已经封装好了分布式锁的实现类,InterProcessMutex,可以非常方便的加解锁,示例代码如下:
创建锁:
public static final String ZNODE_PATH_FOR_LOCK = "/lock_for_user_submit";
public static CuratorFramework zookeeperClient;
private InterProcessMutex lock;
public TestZookeeperLock() {
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
org.apache.curator.connection.ConnectionHandlingPolicy a = null;
zookeeperClient = CuratorFrameworkFactory.builder().retryPolicy(retry).connectString("192.168.56.101:2181").build();
zookeeperClient.start();
//创建分布式锁,同一个Znode路径表示同一个锁
this.lock = new InterProcessMutex(zookeeperClient, ZNODE_PATH_FOR_LOCK);
}
加锁过程:
public boolean lock() {
try {
// 10000ms 为等待时间,该方法在返回之前会阻塞线程,成功获得锁之后会返回 true,否则返回 false
return this.lock.acquire(10000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
解锁方法:
public void unlock() {
try {
if (this.lock.isAcquiredInThisProcess()) {
this.lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
zookeeper的分布式锁使用相同的路径,代表相同的锁,比如上例子中的 /lock_for_user_submit。其家加解锁的过程如下:
每词取锁请求会在指定的锁节点下创建一个新的有序节点,登录zookeepr之后可以查看已经创建的有序子节点:
[zk: localhost:2181(CONNECTED) 28] ls /lock_for_user_submit [_c_df74c204-80fa-45ff-9015-0033a4c62cdf-lock-0000000024,\ c1cd1c9e9-ea4d-443b-b0d8-95458b0c0738-lock-0000000025, \ c2522d2ae-57a7-43fb-ba10-773cfa962b09-lock-0000000026]
判断是否获得锁时,会获得锁节点(例子中的/lock_for_user_submit)下的所有子节点,如果当前线程创建的子节点是序号最小的子节点,则表示已经获得锁。否则,阻塞线程,并监听最小接节点变化。
当释放锁锁时,会删除锁节点下对应的序号最小子节点,并通知其余处于监听状态的线程重新尝试获取锁。
(补充Curator的Lock其他用法:参考:http://www.aboutyun.com/thread-10725-1-1.html)
总结一下:
基于数据库的分布式锁比较容易实现,而不需要维护额外的中间件,但存在单点问题,数据库宕机会拖累业务,而且频繁数据库操作性能不佳, 加锁期间或占用数据路链接,影响其他数据库操作。基于缓存的方式性能更好,实现起来门槛也并不高,实际生产环境中的缓存也非常广泛。
Zookeeper分布式锁可靠性更高,Zookeeper集群具有多级冗余的灾备能力,避免单点问题,有序节点可以保证过个请求获得锁的公平性。缺点是锁节点的创建、删除和节点监听会有一定额性能开销,从效率和维护成本上略重。
总体而言,有限选择基于缓存的方式,在实际生产环境下,多数产品都会使用缓存,随意具备一定的环境基础。如果是小规模的临时需求,手头有没有可用的缓存服务器,可以用基于数据表的方式来实现。如果你想追求更高的可靠性,且有精力维护一个zookeeper集群,可以使用zookeeper方案。