JAVA:Spring Boot 集成 Chronicle Queue 高性能日志和消息队列

admin
0
2025-11-14

1、简述

Chronicle Queue 是一款高性能、低延迟的日志和消息队列库,专为低延迟应用设计,特别是在金融系统、交易系统等对数据吞吐和实时性要求极高的场景中广泛使用。它主要通过内存映射文件(memory-mapped files)将数据直接写入磁盘,从而实现了接近于内存的性能,同时提供持久化能力。

核心特性

🔥 超低延迟:Chronicle Queue 使用内存映射文件技术,使得数据的读写性能极其接近内存操作,非常适合对性能要求极高的应用场景。

🔥 持久化:虽然其性能非常接近内存操作,但数据实际存储在磁盘上,确保了数据的持久化。

🔥 无锁设计:Chronicle Queue 是无锁设计,避免了多线程竞争带来的性能损失。

🔥 时序日志:所有数据按时间顺序存储,便于时间回溯和数据重放。

🔥 文件轮换:支持按时间或大小进行日志文件轮换,有助于日志管理和归档。

本文将详细介绍 Chronicle Queue 的使用场景、核心特性,以及如何在 Java 中进行集成,最后通过详细代码示例演示其使用。

image-vqw4.png


2、实践样例

2.1 引入依赖

在 pom.xml 文件中引入 Chronicle Queue 依赖:

<dependency>
    <groupId>net.openhft</groupId>
    <artifactId>chronicle-queue</artifactId>
    <version>5.21.20</version> <!-- 请使用最新版本 -->
</dependency>

2.2 创建 Queue

Chronicle Queue 使用非常简单,首先你需要创建一个队列实例。队列数据会被写入到磁盘上的文件中:

import net.openhft.chronicle.queue.ChronicleQueue;

public class ChronicleQueueExample {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
            System.out.println("Chronicle Queue 已创建并存储在 queue-dir 文件夹中");
        }
    }
}

以上代码将在 queue-dir 目录中创建一个 Chronicle Queue 队列。

2.3 生产者写入数据

接下来我们要向队列中写入数据,Chronicle Queue 提供了 Appender 用于将消息写入队列。以下代码展示了如何使用生产者写入消息:

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;

public class Producer {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
            ExcerptAppender appender = queue.acquireAppender();
        
            // 写入数据
            for (int i = 0; i < 10; i++) {
                appender.writeText("Message " + i);
                System.out.println("Written: Message " + i);
            }
        }
    }
}

2.4 消费者读取数据

消费者读取数据非常简单,Chronicle Queue 提供了 Tailer 用于读取队列中的消息。以下代码展示了如何读取生产者写入的消息:

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;

public class Consumer {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
            ExcerptTailer tailer = queue.createTailer();
        
            String message;
            while ((message = tailer.readText()) != null) {
                System.out.println("Read: " + message);
            }
        }
    }
}

3、场景运用

Chronicle Queue 除了基本的读写消息之外,还有一些高级用法,能够满足更多复杂的需求和场景。以下介绍几个常见的高级应用:

🔥 高频交易系统:如金融领域中的订单撮合引擎或行情数据系统,要求极低延迟的消息传递。

🔥 日志记录:能够快速记录大量日志,且能根据时间顺序快速读取。

🔥 实时数据传输:需要快速传递数据,且具备高吞吐量的应用。

3.1 自定义对象的序列化和反序列化

Chronicle Queue 默认处理的是字符串类型数据,但我们经常需要传递更复杂的 Java 对象。Chronicle Queue 提供了序列化支持,可以将自定义的对象写入队列,并在消费时进行反序列化。首先,我们需要实现自定义的 Java 对象,并使用 BytesMarshallable 接口来支持序列化:

import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.bytes.BytesOut;
import net.openhft.chronicle.bytes.BytesMarshallable;

public class Order implements BytesMarshallable {
    private String orderId;
    private double amount;

    public Order(String orderId, double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }

    @Override
    public void writeMarshallable(BytesOut bytes) {
        bytes.writeUtf8(orderId);
        bytes.writeDouble(amount);
    }

    @Override
    public void readMarshallable(BytesIn bytes) {
        this.orderId = bytes.readUtf8();
        this.amount = bytes.readDouble();
    }

    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + '\'' +
                ", amount=" + amount +
                '}';
    }
}

生产者写入自定义对象:

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;

public class CustomObjectProducer {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("object-queue-dir")) {
            ExcerptAppender appender = queue.acquireAppender();
        
            // 写入自定义对象
            for (int i = 0; i < 5; i++) {
                Order order = new Order("Order" + i, 100 + i);
                appender.writeDocument(order);
                System.out.println("Written: " + order);
            }
        }
    }
}

消费者读取自定义对象:

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;

public class CustomObjectConsumer {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("object-queue-dir")) {
            ExcerptTailer tailer = queue.createTailer();
        
            // 读取自定义对象
            Order order;
            while (tailer.readDocument(o -> order = (Order) o)) {
                System.out.println("Read: " + order);
            }
        }
    }
}

3.2 多消费者模式

Chronicle Queue 支持多消费者同时读取队列中的消息。每个消费者都有独立的 Tailer,可以在不同的位置读取队列的数据。通过这种方式,可以实现多个消费者处理不同的数据分片。

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;

public class MultiConsumer {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("multi-consumer-queue")) {
            ExcerptTailer tailer1 = queue.createTailer("consumer1");
            ExcerptTailer tailer2 = queue.createTailer("consumer2");

            String message;
            System.out.println("Consumer 1 reading:");
            while ((message = tailer1.readText()) != null) {
                System.out.println("Consumer 1 read: " + message);
            }

            System.out.println("Consumer 2 reading:");
            while ((message = tailer2.readText()) != null) {
                System.out.println("Consumer 2 read: " + message);
            }
        }
    }
}

在此模式下,不同消费者可以同时消费消息,而不会干扰彼此。

3.3 高级的文件轮换策略

除了按时间轮换文件(如按天、按小时),Chronicle Queue 还支持按文件大小进行轮换。当文件达到设定的大小时,它将自动创建一个新文件来存储后续数据。

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;

public class SizeBasedRollingExample {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.singleBuilder("size-queue-dir")
                .rollCycle(RollCycles.SMALL_HOURLY).blockSize(1 << 20).build()) {
            ExcerptAppender appender = queue.acquireAppender();
        
            // 写入数据
            for (int i = 0; i < 1000; i++) {
                appender.writeText("Large message " + i);
            }
            System.out.println("Data written with size-based file rolling.");
        }
    }
}

在此示例中,我们使用 RollCycles.SMALL_HOURLY 和 blockSize 设置每个文件的大小限制。

3.4 自定义 RollCycle

Chronicle Queue 支持通过 RollCycle 定制轮换策略。你可以根据业务需求定义自己的轮换规则。

import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.wire.SequenceGenerator;

public enum CustomRollCycle implements RollCycle {
    EVERY_FIVE_MINUTES("yyyyMMdd-HHmm", 60 * 1000 * 5);

    private final String format;
    private final int length;

    CustomRollCycle(String format, int length) {
        this.format = format;
        this.length = length;
    }

    @Override
    public String format() {
        return format;
    }

    @Override
    public int lengthInMillis() {
        return length;
    }

    @Override
    public SequenceGenerator toSequenceGenerator() {
        return null; // 使用默认的序列生成器
    }
}

然后在创建队列时使用这个自定义的 RollCycle。

3.5 Chronical Queue 和 Replication(复制)

在分布式系统中,Chronicle Queue 还支持将数据在多个实例间进行复制。通过 Chronicle Queue Replication,你可以将数据同步到不同的节点,以保证高可用性和数据冗余。

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.replication.TcpReplicationHub;

public class ReplicatedQueueExample {
    public static void main(String[] args) {
        try (ChronicleQueue queue = ChronicleQueue.single("replicated-queue")) {
            TcpReplicationHub hub = TcpReplicationHub.builder().host("localhost").port(8080).build();

            // 配置队列为复制模式
            queue.withReplication(hub.createReplicationChannel(1));
        
            // 接下来可以像普通的 Chronicle Queue 那样写入数据
            ExcerptAppender appender = queue.acquireAppender();
            appender.writeText("Replicated message");
        }
    }
}

通过这种方式,你可以将队列中的数据复制到其他节点上,实现数据的高可用性。


4、总结

Chronicle Queue 是一款为高性能需求而设计的轻量级日志和消息队列工具,提供了极低的延迟、持久化和高吞吐量的解决方案。通过内存映射文件的方式,将数据高效地持久化到磁盘中,可以满足对性能和稳定性要求极高的场景,尤其适合金融行业和实时数据系统。

🔥 高效读写:无论是读写性能还是时序日志的顺序访问,都具有非常高的效率。

🔥 高并发无锁设计:由于其设计是无锁的,因此非常适合在高并发场景下使用,极大减少了线程间的竞争。

🔥 数据持久化:队列数据虽然高效,但同时也会自动持久化到磁盘中,不会丢失重要的业务日志或消息数据。

在本文中,我们介绍了 Chronicle Queue 的核心特性,并通过具体的代码展示了如何在 Java 中使用 Chronicle Queue 搭建一个简单的消息队列。

动物装饰