侧边栏壁纸
博主头像
拾荒的小海螺博主等级

只有想不到的,没有做不到的

  • 累计撰写 194 篇文章
  • 累计创建 19 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

架构师:实现接口幂等性机制的设计指南

拾荒的小海螺
2024-10-08 / 0 评论 / 0 点赞 / 10 阅读 / 18059 字

1、简述

在计算机科学中,幂等性是指一次或多次执行某个操作的结果是一致的,即无论请求多少次,系统的状态不会发生改变。换句话说,幂等操作意味着执行一次或多次操作的效果是相同的。

在分布式系统中,幂等性非常重要,因为网络延迟、失败重试等情况时有发生,可能会导致某些请求被重复提交。如果操作不是幂等的,重复提交的请求可能会造成系统状态不一致,甚至带来严重的数据问题。因此,在构建接口或服务时,保证某些操作的幂等性是至关重要的。

1728354956900.jpg

2、常见的幂等性设计

2.1 使用唯一标识(Token)机制

在客户端每次请求时生成一个唯一的 Token,服务器端接收到请求时,首先校验 Token,如果该 Token 已经处理过,则直接返回已有结果,不再处理。如果 Token 是新的,则正常处理请求,并在处理后将 Token 保存起来,避免同一请求被重复处理。

import java.util.HashSet;
import java.util.Set;

public class IdempotencyTokenService {

    // 用于保存已处理过的 token
    private static final Set<String> processedTokens = new HashSet<>();

    // 模拟处理请求的方法
    public static String processRequest(String token, String data) {
        if (processedTokens.contains(token)) {
            return "Request already processed with token: " + token;
        }
    
        // 处理请求的逻辑
        String result = "Processing data: " + data;
    
        // 保存已处理的 token
        processedTokens.add(token);
    
        return result;
    }

    public static void main(String[] args) {
        String token1 = "abc123";
        String token2 = "def456";

        // 第一次请求
        System.out.println(processRequest(token1, "data1")); // 正常处理

        // 重复请求
        System.out.println(processRequest(token1, "data1")); // 已处理,直接返回

        // 新的请求
        System.out.println(processRequest(token2, "data2")); // 正常处理
    }
}

优点:

  • 简单易实现
  • 适用于请求次数较少的场景

缺点:

  • 需要维护 Token,在大规模分布式场景下,可能需要引入 Redis 等分布式存储。

2.2 数据库唯一约束

通过数据库表的唯一索引约束来保证同一操作不会重复插入。比如在订单系统中,可以对订单的 orderId 添加唯一索引,这样即使多次提交相同的 orderId,也只会插入一次。假设有一个订单表 orders,其 order_id 为唯一约束:

CREATE TABLE orders (
    id INT AUTO_INCREMENT PRIMARY KEY,
    order_id VARCHAR(50) NOT NULL UNIQUE,
    product VARCHAR(100),
    amount DECIMAL(10, 2)
);

在 Java 中处理时可以捕获唯一索引冲突的异常,避免重复插入:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class OrderService {

    public static void placeOrder(String orderId, String product, double amount) {
        String url = "jdbc:mysql://localhost:3306/mydb";
        String user = "root";
        String password = "password";

        try (Connection conn = DriverManager.getConnection(url, user, password)) {
            String sql = "INSERT INTO orders (order_id, product, amount) VALUES (?, ?, ?)";
            PreparedStatement stmt = conn.prepareStatement(sql);
            stmt.setString(1, orderId);
            stmt.setString(2, product);
            stmt.setDouble(3, amount);
            stmt.executeUpdate();
            System.out.println("Order placed successfully!");
        } catch (SQLException e) {
            if (e.getErrorCode() == 1062) {
                System.out.println("Order already exists with orderId: " + orderId);
            } else {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 第一次下单
        placeOrder("ORDER123", "Laptop", 1200.00);

        // 重复下单
        placeOrder("ORDER123", "Laptop", 1200.00);
    }
}

优点:

  • 数据库保证唯一性,避免多次插入。
  • 实现简单,不依赖外部系统。

缺点:

  • 需要数据库支持唯一约束。
  • 无法控制幂等请求的响应时间。

2.3 乐观锁

乐观锁通常用于更新操作。每次更新前,先读取一个版本号(或时间戳),然后在更新时检查版本号是否变化。如果版本号没有变化,说明没有其他操作修改过数据,可以执行更新操作。如果版本号已经变化,说明数据已经被修改,当前操作无效。假设有一个用户余额表 user_balance,其结构如下:

CREATE TABLE user_balance (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id INT NOT NULL,
    balance DECIMAL(10, 2),
    version INT
);

在更新余额时,先读取 version,然后带着 version 更新,如果 version 不匹配,则更新失败。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class OptimisticLockService {

    public static void updateBalance(int userId, double amount) {
        String url = "jdbc:mysql://localhost:3306/mydb";
        String user = "root";
        String password = "password";

        try (Connection conn = DriverManager.getConnection(url, user, password)) {
            // 读取用户当前的余额和版本号
            String selectSql = "SELECT balance, version FROM user_balance WHERE user_id = ?";
            PreparedStatement selectStmt = conn.prepareStatement(selectSql);
            selectStmt.setInt(1, userId);
            ResultSet rs = selectStmt.executeQuery();

            if (rs.next()) {
                double balance = rs.getDouble("balance");
                int version = rs.getInt("version");

                // 更新余额,检查版本号
                String updateSql = "UPDATE user_balance SET balance = ?, version = version + 1 WHERE user_id = ? AND version = ?";
                PreparedStatement updateStmt = conn.prepareStatement(updateSql);
                updateStmt.setDouble(1, balance + amount);
                updateStmt.setInt(2, userId);
                updateStmt.setInt(3, version);

                int rowsAffected = updateStmt.executeUpdate();
                if (rowsAffected > 0) {
                    System.out.println("Balance updated successfully!");
                } else {
                    System.out.println("Balance update failed due to version mismatch!");
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        // 模拟更新余额
        updateBalance(1, 100.00);
    }
}

优点:

  • 适用于高并发场景,能有效避免数据被重复更新。

缺点:

  • 实现复杂,需要数据库支持乐观锁。

2.4 状态机

状态机(State Machine)是一种行为模型,它由一组状态和状态之间的转换规则组成。它可以通过不同的输入事件在状态之间进行切换,每次输入事件会触发特定的状态转换。

状态机可以帮助处理多个步骤的业务流程,并确保每个步骤只执行一次,即使同样的请求重复多次也不会影响当前状态,达到幂等性的目的。下面以订单系统为例,使用状态机模式来实现幂等性:

  • 定义订单状态枚举
public enum OrderStatus {
    CREATED,  // 订单已创建
    PAID,     // 订单已支付
    SHIPPED,  // 订单已发货
    COMPLETED // 订单已完成
}
  • 订单类及其状态属性
public class Order {
    private String orderId;
    private OrderStatus status;

    public Order(String orderId) {
        this.orderId = orderId;
        this.status = OrderStatus.CREATED; // 初始状态为已创建
    }

    // 获取订单状态
    public OrderStatus getStatus() {
        return status;
    }

    // 设置订单状态
    public void setStatus(OrderStatus status) {
        this.status = status;
    }

    public String getOrderId() {
        return orderId;
    }
}
  • 状态机服务
public class OrderStateMachine {

    // 模拟订单存储(如持久化数据库)
    private static Map<String, Order> orderMap = new HashMap<>();

    /**
     * 创建订单
     */
    public static void createOrder(String orderId) {
        Order order = new Order(orderId);
        orderMap.put(orderId, order);
        System.out.println("Order created with ID: " + orderId);
    }

    /**
     * 处理支付
     */
    public static void processPayment(String orderId) {
        Order order = orderMap.get(orderId);
        if (order == null) {
            throw new RuntimeException("Order not found");
        }
        if (order.getStatus() != OrderStatus.CREATED) {
            System.out.println("Order has already been paid or is in another state. Current state: " + order.getStatus());
            return;
        }
        // 进行支付处理
        order.setStatus(OrderStatus.PAID);
        System.out.println("Order paid successfully. Current state: " + order.getStatus());
    }

    /**
     * 处理发货
     */
    public static void processShipment(String orderId) {
        Order order = orderMap.get(orderId);
        if (order == null) {
            throw new RuntimeException("Order not found");
        }
        if (order.getStatus() != OrderStatus.PAID) {
            System.out.println("Order cannot be shipped. Current state: " + order.getStatus());
            return;
        }
        // 进行发货处理
        order.setStatus(OrderStatus.SHIPPED);
        System.out.println("Order shipped successfully. Current state: " + order.getStatus());
    }

    /**
     * 完成订单
     */
    public static void completeOrder(String orderId) {
        Order order = orderMap.get(orderId);
        if (order == null) {
            throw new RuntimeException("Order not found");
        }
        if (order.getStatus() != OrderStatus.SHIPPED) {
            System.out.println("Order cannot be completed. Current state: " + order.getStatus());
            return;
        }
        // 完成订单处理
        order.setStatus(OrderStatus.COMPLETED);
        System.out.println("Order completed successfully. Current state: " + order.getStatus());
    }
}
  • 测试用例
public class StateMachineTest {
    public static void main(String[] args) {
        String orderId = "order123";

        // 创建订单
        OrderStateMachine.createOrder(orderId);

        // 处理支付(正常流程)
        OrderStateMachine.processPayment(orderId);

        // 处理发货(正常流程)
        OrderStateMachine.processShipment(orderId);

        // 重复发货请求(将会跳过处理)
        OrderStateMachine.processShipment(orderId);

        // 完成订单
        OrderStateMachine.completeOrder(orderId);

        // 重复支付请求(将会跳过处理)
        OrderStateMachine.processPayment(orderId);
    }
}

优点:

  • 业务流程清晰:状态机可以直观地反映业务流程中的各个状态及其转换条件,有助于理解和维护。
  • 适合多步骤业务:对于涉及多个步骤的业务流程,状态机可以很好地控制各个状态之间的转换,避免业务重复处理。
  • 幂等性自然实现:通过状态检查,自然避免了重复处理某个步骤。

缺点:

  • 状态复杂性:对于复杂的业务场景,状态机的状态和转换条件可能非常多,增加了实现难度。
  • 状态持久化:需要额外处理状态的持久化问题,例如需要将状态保存在数据库或缓存中。

2.5 悲观锁

悲观锁是一种经典的并发控制机制,通过锁住数据来确保在一个线程对资源进行操作时,其他线程不能访问或修改该资源。悲观锁假设并发操作是非常频繁的,因此对每次操作都加锁,确保线程安全。在数据库层面,悲观锁通常会通过 SQL 语句直接加锁(如 SELECT FOR UPDATE),从而实现数据的串行化处理。

假设我们有一个订单系统,在用户支付时,我们希望确保幂等性,即同一订单只能成功支付一次。我们可以通过悲观锁来实现这一点,防止并发操作。

  • 订单表结构
CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id VARCHAR(255) UNIQUE,
    status VARCHAR(50),  -- 订单状态,如 CREATED, PAID, SHIPPED, etc.
    amount DECIMAL(10, 2)
);
  • 实现支付操作
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class OrderService {

    private Connection connection; // 通过连接池获取的数据库连接

    public OrderService(Connection connection) {
        this.connection = connection;
    }

    /**
     * 处理订单支付操作,使用悲观锁确保幂等性
     * @param orderId 订单ID
     * @throws SQLException 数据库操作异常
     */
    public void processPayment(String orderId) throws SQLException {
        // 开启数据库事务
        connection.setAutoCommit(false);
        try {
            // 查询订单状态并锁定行(FOR UPDATE)
            String query = "SELECT * FROM orders WHERE order_id = ? FOR UPDATE";
            PreparedStatement statement = connection.prepareStatement(query);
            statement.setString(1, orderId);
            ResultSet resultSet = statement.executeQuery();

            if (!resultSet.next()) {
                throw new RuntimeException("Order not found");
            }

            String status = resultSet.getString("status");
            if ("PAID".equals(status)) {
                System.out.println("Order has already been paid, skipping payment.");
                return; // 如果订单已经支付,直接返回
            }

            // 处理支付逻辑
            System.out.println("Processing payment for order: " + orderId);
            // 支付成功后更新订单状态为 PAID
            String updateQuery = "UPDATE orders SET status = ? WHERE order_id = ?";
            PreparedStatement updateStatement = connection.prepareStatement(updateQuery);
            updateStatement.setString(1, "PAID");
            updateStatement.setString(2, orderId);
            updateStatement.executeUpdate();

            // 提交事务
            connection.commit();
            System.out.println("Payment processed successfully for order: " + orderId);

        } catch (SQLException e) {
            // 如果出现异常,回滚事务
            connection.rollback();
            throw new RuntimeException("Payment failed, transaction rolled back.", e);
        } finally {
            connection.setAutoCommit(true); // 恢复自动提交
        }
    }
}

优点

  • 数据一致性强:悲观锁保证了数据的一致性,即在一个线程操作数据时,其他线程无法访问该数据,确保了幂等性。
  • 适合高冲突场景:在高并发的场景下,悲观锁可以避免数据被多个线程同时修改,特别适合资源冲突严重的场景。

缺点

  • 性能问题:由于悲观锁会锁住数据,其他线程在等待锁的过程中无法继续操作,这会导致性能下降,特别是在高并发的情况下。
  • 死锁风险:如果多个事务相互等待锁,可能会导致死锁问题,需要额外处理。

2.6 分布式锁

在高并发的分布式系统中,使用分布式锁来实现幂等性是一种常见且有效的方式。分布式锁的核心思想是确保同一时间内,只有一个线程或请求可以执行某一特定的操作。通过对关键资源进行加锁,避免多次重复操作,从而保证系统的幂等性。

SETNX 是 Redis 中的命令,其含义是 "SET if Not eXists"。该命令的作用是当某个键不存在时设置该键,且设置成功时返回 1。同时,为了防止死锁,还可以结合过期时间 EXPIRE 一起使用:

import redis.clients.jedis.Jedis;

public class RedisDistributedLock {

    private Jedis jedis = new Jedis("localhost");

    /**
     * 尝试获取锁
     * @param lockKey 锁的键
     * @param requestId 请求的唯一标识,用于释放锁
     * @param expireTime 锁的过期时间,单位是秒
     * @return 是否成功获取锁
     */
    public boolean tryGetLock(String lockKey, String requestId, int expireTime) {
        // SETNX + EXPIRE 来获取锁并设置过期时间
        String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
        return "OK".equals(result);
    }

    /**
     * 释放锁
     * @param lockKey 锁的键
     * @param requestId 请求的唯一标识,只有匹配该标识的锁才能被释放
     * @return 是否成功释放锁
     */
    public boolean releaseLock(String lockKey, String requestId) {
        // 通过 Lua 脚本来确保释放的锁是当前持有的锁
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                           "return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(luaScript, 1, lockKey, requestId);
        return "1".equals(result.toString());
    }

    public static void main(String[] args) {
        RedisDistributedLock lockService = new RedisDistributedLock();

        String lockKey = "order_lock";
        String requestId = "unique_request_123"; // 用来标识请求的唯一 ID
        int expireTime = 10; // 锁的过期时间为10秒

        // 尝试获取锁
        if (lockService.tryGetLock(lockKey, requestId, expireTime)) {
            try {
                // 获取锁成功,执行幂等操作
                System.out.println("Lock acquired, processing request...");
                // 模拟业务处理逻辑
                Thread.sleep(5000); // 模拟耗时操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放锁
                boolean isReleased = lockService.releaseLock(lockKey, requestId);
                if (isReleased) {
                    System.out.println("Lock released.");
                } else {
                    System.out.println("Failed to release lock.");
                }
            }
        } else {
            System.out.println("Failed to acquire lock, another process is handling the request.");
        }
    }
}

优点

  • 高效:Redis 作为内存数据库,读写性能极高,能够快速加锁和解锁,适合高并发场景。
  • 可扩展性强:Redis 是分布式缓存,可以轻松扩展以支持大规模的分布式系统。
  • 简单易实现:只需 Redis 集群或单实例即可实现,无需复杂的中间件支持。

缺点

  • 可能存在锁失效问题:如果业务执行时间超过锁的过期时间,锁可能会提前释放,导致其他请求获取到锁,从而出- 现并发问题。因此,在设计锁的过期时间时,需要充分考虑业务的执行时间。
  • 锁超时问题:如果不设置合理的锁过期时间,或者锁释放不及时,可能会导致死锁问题。
  • Redis 单点问题:如果使用单点 Redis,可能会出现单点故障问题,建议使用 Redis 集群或 Redis Sentinel 保证高可用性。

3、应用场景

  • 支付系统:重复提交支付请求时,支付结果应相同,不能因为重复提交导致多次扣款。
  • 订单系统:重复提交订单请求时,应该只创建一条订单记录。
  • 短信发送:同样的短信发送请求多次时,应该只发送一条短信。
  • 库存系统:在并发扣减库存时,使用分布式锁防止多次扣减,确保库存的一致性。

4、总结

幂等性在系统设计中是一个重要的概念,尤其是在分布式系统中,避免请求重复处理非常关键。通过 Token 机制、数据库唯一约束、乐观锁、分布式锁及 Redis 等方式,可以有效地实现幂等性,保证系统的稳定性和一致性。

具体使用哪种方式取决于业务场景的需求和系统的架构设计。如果是支付、订单等关键业务,可以考虑 Token 机制和数据库约束,而在高并发环境中,Redis 和乐观锁也是常见的解决方案。

0

评论区