侧边栏壁纸
博主头像
拾荒的小海螺博主等级

只有想不到的,没有做不到的

  • 累计撰写 140 篇文章
  • 累计创建 15 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

架构师:使用 Zookeeper 实现分布式锁的技术指南

拾荒的小海螺
2024-09-20 / 0 评论 / 0 点赞 / 4 阅读 / 8596 字

1、简述

在分布式系统中,多个节点可能需要访问共享资源或执行需要互斥的操作,为了避免竞争导致数据不一致或资源争用,我们需要一种机制来协调各个节点对资源的访问。分布式锁是用于解决这种竞争问题的关键技术,它确保在同一时间只有一个节点能够访问或修改共享资源。

image-rbwv.png

2、Zookeeper 与分布式锁

Zookeeper 是一个开源的分布式协调服务,主要用于提供分布式数据一致性和协调功能。Zookeeper 本身通过其强一致性、顺序一致性和高可用性特性,成为实现分布式锁的常用工具之一。Zookeeper 通过其临时有序节点以及watcher 机制,可以有效地实现分布式锁的功能。

2.1 为什么使用 Zookeeper 实现分布式锁?

  • 可靠性高:Zookeeper 提供了一致性、可用性和分区容忍性的保证,确保锁服务在节点宕机或网络分区时仍能正常工作。
  • 公平性:通过有序节点的机制,Zookeeper 实现的分布式锁可以保证多个客户端获取锁的顺序是按照请求的顺序,具备公平性。
  • 自动释放锁:通过创建临时节点,客户端断开时,锁会自动释放,避免死锁情况。

2.2 实现分布式锁的基本原理

Zookeeper 实现分布式锁的核心思想是通过创建临时顺序节点来进行锁的获取与释放。具体步骤如下:

  • 创建锁节点:每个客户端在锁的目录下创建一个带有顺序标志的临时节点。
  • 获取锁:客户端获取锁的条件是,自己创建的节点是锁目录中所有节点里序号最小的。如果是最小的节点,代表获取到锁。
  • 监听前驱节点:如果客户端不是最小的节点,则监听比自己序号小的上一个节点的删除事件。
  • 释放锁:当客户端执行完临界区代码后,删除自己创建的临时节点,其他客户端会收到通知,然后重新判断自己是否可以获取锁。

3、集成Curator样例实现

以下是基于 Java 的 Zookeeper 客户端(Apache Curator)实现分布式锁的一个简单示例:

3.1 引入依赖

在 Maven 项目中,需要引入 Apache Curator 依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version> <!-- 确保使用最新稳定版本 -->
</dependency>

3.2 实现分布式锁

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.CuratorFrameworkFactory;

public class ZookeeperDistributedLock {

    private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
    private static final String LOCK_PATH = "/distributed_lock";
    private static final int SESSION_TIMEOUT = 5000;

    public static void main(String[] args) {
        // 创建Zookeeper客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS, 
                                        new ExponentialBackoffRetry(1000, 3));
        client.start();

        // 创建分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);

        try {
            // 尝试获取锁
            if (lock.acquire(5000, java.util.concurrent.TimeUnit.MILLISECONDS)) {
                try {
                    System.out.println("获取到锁,开始执行任务...");
                    // 在此执行需要互斥的业务操作
                    Thread.sleep(3000); // 模拟任务执行
                } finally {
                    // 释放锁
                    lock.release();
                    System.out.println("任务执行完毕,释放锁");
                }
            } else {
                System.out.println("未能获取到锁,任务取消");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            client.close();
        }
    }
}

代码解析:

  • CuratorFramework:使用 Curator 框架来简化 Zookeeper 的操作,它封装了底层 API,提供了更简单的接口和高层次的实现,如分布式锁等。
  • InterProcessMutex:Curator 提供的分布式可重入互斥锁,它确保同一时刻只有一个客户端可以获取到锁。
  • acquire() 方法:尝试在一定时间内获取锁。如果超时未获取到锁,则放弃。
  • release() 方法:释放锁,允许其他等待的客户端获取锁。

4、利用节点实现

利用 Zookeeper 的临时顺序节点可以实现分布式锁,其原理是在 Zookeeper 的某个特定路径下创建临时有序节点,通过节点的顺序来确定谁持有锁。以下是如何通过 Zookeeper 节点来实现分布式锁的代码示例和解释。

4.1 实现分布式锁的步骤

  • 创建节点:每个客户端在锁的目录下(例如 /locks)创建一个带有顺序的临时节点。
  • 获取锁:客户端获取锁的条件是,自己创建的节点是该目录下所有节点中序号最小的。
  • 监听前驱节点:如果不是最小节点,客户端将会监听比自己小的节点的删除事件。
  • 释放锁:当锁的持有者任务完成时,删除自己的节点,其他节点收到通知后可以尝试获取锁。

4.2 样例代码

下面是使用 Zookeeper 实现分布式锁的 Java 代码示例,基于 Zookeeper 原生 API 实现:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class ZookeeperDistributedLock {

    private ZooKeeper zooKeeper;
    private String lockRoot = "/locks";  // 锁根节点
    private String lockNode;  // 当前客户端创建的节点
    private String lockPath;  // 当前请求锁路径

    public ZookeeperDistributedLock(String connectString) throws IOException {
        zooKeeper = new ZooKeeper(connectString, 3000, event -> {});
        try {
            Stat stat = zooKeeper.exists(lockRoot, false);
            if (stat == null) {
                zooKeeper.create(lockRoot, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 尝试获取锁
    public void acquireLock() throws KeeperException, InterruptedException {
        // 创建一个临时有序节点
        lockPath = zooKeeper.create(lockRoot + "/lock_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Created lock node: " + lockPath);

        // 获取所有的锁节点,检查当前节点是否是最小的节点
        List<String> nodes = zooKeeper.getChildren(lockRoot, false);
        Collections.sort(nodes);

        String nodeName = lockPath.substring(lockRoot.length() + 1);
        int index = nodes.indexOf(nodeName);
        if (index == 0) {
            System.out.println("获取到锁: " + lockPath);
            return;
        }

        // 如果当前节点不是最小的,监听前一个节点
        String prevNode = nodes.get(index - 1);
        Stat stat = zooKeeper.exists(lockRoot + "/" + prevNode, event -> {
            if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                System.out.println("节点 " + prevNode + " 已删除,尝试获取锁");
                try {
                    acquireLock();
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        if (stat == null) {
            acquireLock();
        }
    }

    // 释放锁
    public void releaseLock() throws KeeperException, InterruptedException {
        zooKeeper.delete(lockPath, -1);
        System.out.println("释放锁: " + lockPath);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZookeeperDistributedLock lock = new ZookeeperDistributedLock("localhost:2181");

        lock.acquireLock();
        // 模拟临界区代码
        Thread.sleep(5000);
        lock.releaseLock();
    }
}

代码说明

  • Zookeeper 客户端连接:通过 ZooKeeper 类实例化一个 Zookeeper 客户端,并连接到 Zookeeper 服务。
  • 创建临时有序节点:在 /locks 目录下创建带有顺序标识的临时节点,节点的名称为 lock_ 开头,并附加一个递增的顺序编号。
  • 排序节点:获取所有的锁节点,并根据顺序排序,判断自己是否是最小节点。
  • 监听前驱节点:如果不是最小节点,则监听前驱节点的删除事件,收到删除通知后,尝试再次获取锁。
  • 释放锁:在任务执行完成后,删除自己的节点,以便其他客户端可以获取锁。

5、场景应用

  • 分布式任务调度:多个分布式节点尝试执行同一任务时,通过 Zookeeper 分布式锁,确保每个时刻只有一个节点在执行任务。
  • 资源互斥访问:当多个分布式服务需要访问同一个共享资源(如数据库表或文件)时,使用分布式锁来确保只有一个节点能访问资源,避免数据不一致。
  • 库存系统中的并发控制:在电商系统中,库存需要并发控制,使用分布式锁确保同一时间只有一个节点可以修改库存数据,避免超卖问题。
  • 分布式事务协调:在分布式系统中协调多个事务的提交或回滚,使用分布式锁来保证多个服务实例中的事务协调操作不会冲突。

6、总结

通过 Zookeeper 的临时有序节点机制,分布式系统中的多个节点可以安全有效地争抢资源,实现分布式锁。Zookeeper 实现的分布式锁具有高可靠性和公平性,适合用于多种分布式场景,如任务调度、资源访问控制等。

0

评论区