您现在的位置是:亿华云 > 热点
分布式锁实现原理与最佳实践
亿华云2025-10-03 06:50:34【热点】0人已围观
简介来源:阿里云开发者阿里妹导读在单体的应用开发场景中涉及并发同步时,大家往往采用Synchronized同步)或同一个JVM内Lock机制来解决多线程间的同步问题。而在分布式集群工作的开发场景中,就需要
来源:阿里云开发者
阿里妹导读
在单体的应用开发场景中涉及并发同步时,大家往往采用Synchronized(同步)或同一个JVM内Lock机制来解决多线程间的式锁实现实践同步问题。而在分布式集群工作的原理开发场景中,就需要一种更加高级的最佳锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的分布锁就是分布式锁。接下来本文将为大家分享分布式锁的式锁实现实践最佳实践。一、原理超卖问题复现
1.1 现象
存在如下的最佳几张表:商品表



1.2 解决办法
从上面造成问题的原因来看,只要是扣减库存的动作,不是原子性的。多个线程同时操作就会有问题。单体应用:使用本地锁 + 数据库中的行锁解决
分布式应用:
使用数据库中的乐观锁,加一个 version 字段,香港云服务器利用CAS来实现,会导致大量的 update 失败
使用数据库维护一张锁的表 + 悲观锁 select,使用 select for update 实现
使用Redis 的 setNX实现分布式锁
使用zookeeper的watcher + 有序临时节点来实现可阻塞的分布式锁
使用Redisson框架内的分布式锁来实现
使用curator 框架内的分布式锁来实现二、单体应用解决超卖的问题
正确示例:将事务包含在锁的控制范围内
保证在锁释放之前,事务已经提交。//@Transactional(rollbackFor = Exception.class)public synchronized Long createOrder() throws Exception { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); Product product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null) { platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:" + purchaseProductId + "不存在"); } //商品当前库存 Integer currentCount = product.getCount(); //校验库存 if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买"); } productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId()); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); orderItem.setOrderId(order.getId()); // ... 省略 Set return order.getId(); platformTransactionManager.commit(transaction1);}正确示例:使用synchronized的代码块
public Long createOrder() throws Exception { Product product = null; //synchronized (this) { //synchronized (object) { synchronized (DBOrderService2.class) { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null) { platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:" + purchaseProductId + "不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount); //校验库存 if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买"); } productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId()); platformTransactionManager.commit(transaction1); } TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); // ... 省略 Set orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction2); return order.getId();正确示例:使用Lock
private Lock lock = new ReentrantLock();public Long createOrder() throws Exception{ Product product = null; lock.lock(); TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); try { product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ throw new Exception("购买商品:"+purchaseProductId+"不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount); //校验库存 if (purchaseProductNum > currentCount){ throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买"); } productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId()); platformTransactionManager.commit(transaction1); } catch (Exception e) { platformTransactionManager.rollback(transaction1); } finally { // 注意抛异常的时候锁释放不掉,分布式锁也一样,都要在这里删掉 lock.unlock(); } TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); // ... 省略 Set orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); // ... 省略 Set orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction); return order.getId();}三、常见分布式锁的使用
上面使用的方法只能解决单体项目,当部署多台机器的时候就会失效,因为锁本身就是单机的锁,所以需要使用分布式锁来实现。3.1 数据库乐观锁
数据库中的乐观锁,服务器租用加一个version字段,利用CAS来实现,乐观锁的方式支持多台机器并发安全。但是并发量大的时候会导致大量的update失败3.2 数据库分布式锁
db操作性能较差,并且有锁表的风险,一般不考虑。3.2.1 简单的数据库锁

3.3 Redis setNx
Redis 原生支持的,保证只有一个会话可以设置成功,因为Redis自己就是单线程串行执行的。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>spring.redis.host=localhost封装一个锁对象:@Slf4jpublic class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; //单位:秒 private int expireTime; /** * 没有传递 value,因为直接使用的是随机值 */ public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime=expireTime; this.value = UUID.randomUUID().toString(); } /** * JDK 1.7 之后的自动关闭的功能 */ @Override public void close() throws Exception { unLock(); } /** * 获取分布式锁 * SET resource_name my_random_value NX PX 30000 * 每一个线程对应的随机值 my_random_value 不一样,用于释放锁的时候校验 * NX 表示 key 不存在的时候成功,key 存在的时候设置不成功,Redis 自己是单线程,串行执行的,第一个执行的才可以设置成功 * PX 表示过期时间,没有设置的话,忘记删除,就会永远不过期 */ public boolean getLock(){ RedisCallback<Boolean> redisCallback = connection -> { //设置NX RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //设置过期时间 Expiration expiration = Expiration.seconds(expireTime); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); //序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } /** * 释放锁的时候随机数相同的时候才可以释放,避免释放了别人设置的锁(自己的已经过期了所以别人才可以设置成功) * 释放的时候采用 LUA 脚本,因为 delete 没有原生支持删除的时候校验值,证明是当前线程设置进去的值 * 脚本是在官方文档里面有的 */ public boolean unLock() { // key 是自己才可以释放,不是就不能释放别人的锁 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); // 执行脚本的时候传递的 value 就是对应的值 Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:"+result); return result; }}每次获取的时候,自己线程需要new对应的RedisLock:public String redisLock(){ log.info("我进入了方法!"); try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){ if (redisLock.getLock()) { log.info("我进入了锁!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成";}3.4 zookeeper 瞬时znode节点 + watcher监听机制
临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;
其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;
下一个序号的线程得到通知,继续执行;
以此类推,创建节点的时候,就确认了线程执行的顺序。
zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:
当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:/** * 自己本身就是一个 watcher,可以得到通知 * AutoCloseable 实现自动关闭,资源不使用的时候 */@Slf4jpublic class ZkLock implements AutoCloseable, Watcher { private ZooKeeper zooKeeper; /** * 记录当前锁的名字 */ private String znode; public ZkLock() throws IOException { this.zooKeeper = new ZooKeeper("localhost:2181", 10000,this); } public boolean getLock(String businessCode) { try { //创建业务 根节点 Stat stat = zooKeeper.exists("/" + businessCode, false); if (stat==null){ zooKeeper.create("/" + businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建瞬时有序节点 /order/order_00000001 znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取业务节点下 所有的子节点 List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false); //获取序号最小的(第一个)子节点 Collections.sort(childrenNodes); String firstNode = childrenNodes.get(0); //如果创建的节点是第一个子节点,则获得锁 if (znode.endsWith(firstNode)){ return true; } //如果不是第一个子节点,则监听前一个节点 String lastNode = firstNode; for (String node:childrenNodes){ if (znode.endsWith(node)){ zooKeeper.exists("/"+businessCode+"/"+lastNode,true); break; }else { lastNode = node; } } synchronized (this){ wait(); } return true; } catch (Exception e) { e.printStackTrace(); } return false; } @Override public void close() throws Exception { zooKeeper.delete(znode,-1); zooKeeper.close(); log.info("我已经释放了锁!"); } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } }}3.5 zookeeper curator
在实际的开发中,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入锁。<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions></dependency>@Bean(initMethod="start",destroyMethod = "close")public CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory. newClient("localhost:2181", retryPolicy); return client;}框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。
官网中分布式锁的实现是在curator-recipes依赖中,不要引用错了。@Autowiredprivate CuratorFramework client;@Testpublic void testCuratorLock(){ InterProcessMutex lock = new InterProcessMutex(client, "/order"); try { if ( lock.acquire(30, TimeUnit.SECONDS) ) { try { log.info("我获得了锁!!!"); } finally { lock.release(); } } } catch (Exception e) { e.printStackTrace(); } client.close();}3.6 Redission
重新实现了Java并发包下处理并发的类,让其可以跨JVM使用,例如CHM等。3.6.1 非SpringBoot项目引入https://redisson.org/引入Redisson的依赖,然后配置对应的XML即可:<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions></dependency>编写相应的redisson.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:redisson="http://redisson.org/schema/redisson" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://redisson.org/schema/redisson http://redisson.org/schema/redisson/redisson.xsd"> <redisson:client> <redisson:single-server address="redis://127.0.0.1:6379"/> </redisson:client></beans>配置对应@ImportResource("classpath*:redisson.xml")资源文件。
3.6.2 SpringBoot项目引入或者直接使用springBoot的starter即可。https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.1</version></dependency>修改application.properties即可:#spring.redis.host=3.6.3 设置配置类@Beanpublic RedissonClient getRedissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); return Redisson.create(config);}3.6.4 使用@Testpublic void testRedissonLock() { RLock rLock = redisson.getLock("order"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("我获得了锁!!!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("我释放了锁!!"); rLock.unlock(); }}3.7 Etcd
参考以下文章,普通项目不会为了一把锁引入etcd,此处不再赘述:https://time.geekbang.org/column/article/350285四、常见分布式锁的原理
4.1 Redisson
Redis 2.6之后才可以执行lua脚本,比起管道而言,这是原子性的,模拟一个商品减库存的原子操作://lua脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10jedis.set("product_stock_10016", "15"); //初始化商品10016的库存String script = " local count = redis.call(get, KEYS[1]) " + " local a = tonumber(count) " + " local b = tonumber(ARGV[1]) " + " if a >= b then " + " redis.call(set, KEYS[1], a-b) " + " return 1 " + " end " + " return 0 ";Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));System.out.println(obj);












4.2 RedLock解决非单体项目的Redis主从架构的锁失效
https://redis.io/docs/manual/patterns/distributed-locks/查看Redis官方文档,对于单节点的Redis ,使用setnx和lua del删除分布式锁是足够的,但是主从架构的场景下:锁先加在一个master节点上,默认是异步同步到从节点,此时master挂了会选择slave为master,此时又可以加锁,就会导致超卖。但是如果使用zookeeper来实现的话,由于zk是CP的,所以CP不存在这样的问题。Redis文档中给出了RedLock的解决办法,使用redLock真的可以解决吗?4.2.1 RedLock 原理基于客户端的实现,是基于多个独立的Redis Master节点的一种实现(一般为5)。client依次向各个节点申请锁,若能从多数个节点中申请锁成功并满足一些条件限制,那么client就能获取锁成功。它通过独立的N个Master节点,避免了使用主备异步复制协议的缺陷,只要多数Redis节点正常就能正常工作,显著提升了分布式锁的安全性、可用性。
获取锁
获取当前时间T1,作为后续的计时依据;
按顺序地,依次向5个独立的节点来尝试获取锁 SET resource_name my_random_value NX PX 30000;
计算获取锁总共花了多少时间,判断获取锁成功与否;
时间:T2-T1;
多数节点的锁(N/2+1);
当获取锁成功后的有效时间,要从初始的时间减去第三步算出来的消耗时间;
如果没能获取锁成功,尽快释放掉锁。
释放锁
向所有节点发起释放锁的操作,不管这些节点有没有成功设置过。public String redlock() { String lockKey = "product_001"; //这里需要自己实例化不同redis实例的redisson客户端连接,这里只是伪代码用一个redisson客户端简化了 RLock lock1 = redisson.getLock(lockKey); RLock lock2 = redisson.getLock(lockKey); RLock lock3 = redisson.getLock(lockKey); /** * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里) */ RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); try { /** * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完) */ boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS); if (res) { //成功获得锁,在这里处理业务 } } catch (Exception e) { throw new RuntimeException("lock fail"); } finally { //无论如何, 最后都要解锁 redLock.unlock(); } return "end";}但是,它的实现建立在一个不安全的系统模型上的,它依赖系统时间,当时钟发生跳跃时,也可能会出现安全性问题。分布式存储专家Martin对RedLock的分析文章,Redis作者的也专门写了一篇文章进行了反驳。
Martin Kleppmann:How to do distributed locking
https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.htmlAntirez:Is Redlock safe?
http://antirez.com/news/1014.2.2 RedLock 问题一:持久化机制导致重复加锁如果是上面的架构图,一般生产都不会配置AOF的每一条命令都落磁盘,一般会设置一些间隔时间,比如1s,如果ABC节点加锁成功,有一个节点C恰好是在1s内加锁,还没有落盘,此时挂了,就会导致其他客户端通过CDE又会加锁成功。4.2.3 RedLock 问题二:主从下重复加锁
4.3 Curator
InterProcessMutex 可重入锁的分析
五、业务中使用分布式锁的注意点
获取的锁要设置有效期,假设我们未设置key自动过期时间,在Set key value NX 后,如果程序crash或者发生网络分区后无法与Redis节点通信,毫无疑问其他 client 将永远无法获得锁,这将导致死锁,服务出现中断。SETNX和EXPIRE命令去设置key和过期时间,这也是不正确的,因为你无法保证SETNX和EXPIRE命令的原子性。自己使用 setnx 实现Redis锁的时候,注意并发情况下不要释放掉别人的锁(业务逻辑执行时间超过锁的过期时间),导致恶性循环。一般:1)加锁的时候需要指定value的内容是当前进程中的当前线程的唯一标记,不要使用线程ID作为当前线程的锁的标记,因为不同实例上的线程ID可能是一样的。2)释放锁的逻辑会写在finally ,释放锁时候要判断锁对应的value,而且要使用lua脚本实现原子 del 操作。因为if逻辑判断完之后也可能失效导致删除别人的锁。
3)针对扣减库存这个逻辑,lua脚本里面实现Redis比较库存、扣减库存操作的原子性。通过判断Redis Decr命令的返回值即可。此命令会返回扣减后的最新库存,若小于0则表示超卖。
5.1 自己实现分布式锁的坑
setnx不关心锁的顺序导致删除别人的锁锁失效之后,别人加锁成功,自己把别人的锁删了。我们无法预估程序执行需要的锁的时间。public String deductStock() { String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deltaqin"); stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end";}setnx关心锁的顺序还是删除了别人的锁并发会卡在各种地方,卡住的时候过期了,就会删掉别人加的锁:错误的原因还是因为解锁的逻辑不是原子性的,这里可以参考Redisson的解锁逻辑使用lua脚本实现。public String deductStock() { String lockKey = "lock:product_101"; String clientId = UUID.randomUUID().toString(); Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v) if (!result) { return "error_code"; } try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { // 卡在这里,锁过期了,其他线程又可以加锁,此时又把其他线程新加的锁删掉了 stringRedisTemplate.delete(lockKey); } } return "end";}解决办法这种问题解决的办法就是使用锁续命,比如使用一个定时任务间隔小于锁的超时时间,每隔一段时间就给锁续命,除非线程自己主动删除。这也是Redisson的实现思路。5.2 锁优化:分段加锁逻辑
针对一个商品,要开启秒杀的时候,会将商品的库存预先加载到Redis缓存中,比如有100个库存,此时可以分为5个key,每一个key有20个库存。可以把分布式锁的性能提升5倍。例如:product_10111_stock = 100
product_10111_stock1 = 20
product_10111_stock2 = 20
product_10111_stock3 = 20
product_10111_stock4 = 20
product_10111_stock5 = 20请求来了可以随机可以轮询,扣减完之后就标记不要下次再分配到这个库存。六、分布式锁的真相与选择
6.1 分布式锁的真相
需要满足的几个特性互斥:不同线程、进程互斥。
超时机制:临界区代码耗时导致,网络原因导致。可以使用额外的线程续命保证。
完备的锁接口:阻塞的和非阻塞的接口都要有,lock和tryLock。
可重入性:当前请求的节点+ 线程唯一标识。
公平性:锁唤醒时候,按照顺序唤醒。
正确性:进程内的锁不会因为报错死锁,因为崩溃的时候整个进程都会结束。但是多实例部署时死锁就很容易发生,如果粗暴使用超时机制解决死锁问题,就默认了下面这个假设:
锁的超时时间 >> 获取锁的时延 + 执行临界区代码的时间 + 各种进程的暂停(比如 GC)
但上述假设其实无法保证的。将分布式锁定位为,可以容忍非常小概率互斥语义失效场景下的锁服务。一般来说,一个分布式锁服务,它的正确性要求越高,性能可能就会越低。6.2 分布式锁的选择
数据库:db操作性能较差,并且有锁表的风险,一般不考虑。
优点:实现简单、易于理解
缺点:对数据库压力大
Redis:适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
优点:易于理解
缺点:自己实现、不支持阻塞
Redisson:相对于Jedis其实更多用在分布式的场景。
优点:提供锁的方法,可阻塞
Zookeeper:适用于高可靠(高可用),而并发量不是太高的场景。
优点:支持阻塞
缺点:需理解Zookeeper、程序复杂
Curator
优点:提供锁的方法
缺点:Zookeeper,强一致,慢
Etcd:安全和可靠性上有保证,但是比较重。不推荐自己编写的分布式锁,推荐使用Redisson和Curator实现的分布式锁。很赞哦!(58)
上一篇: 为什么数据中心温湿度监控至关重要?